From 549c0967f552ea26aec835bf83bf850cbf333424 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 12 Jul 2020 15:42:29 -0700 Subject: [PATCH 1/7] Coalesce bucketed table for shuffled hash join if applicable --- .../apache/spark/sql/internal/SQLConf.scala | 20 +- .../spark/sql/execution/QueryExecution.scala | 4 +- .../bucketing/CoalesceBucketsInJoin.scala | 172 ++++++++++++++++++ .../CoalesceBucketsInSortMergeJoin.scala | 132 -------------- .../org/apache/spark/sql/ExplainSuite.scala | 2 +- ...scala => CoalesceBucketsInJoinSuite.scala} | 120 +++++++++--- .../spark/sql/sources/BucketedReadSuite.scala | 14 +- 7 files changed, 287 insertions(+), 177 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala rename sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/{CoalesceBucketsInSortMergeJoinSuite.scala => CoalesceBucketsInJoinSuite.scala} (58%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 89c41f31ff234..06fbb0a31c005 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2638,21 +2638,22 @@ object SQLConf { .booleanConf .createWithDefault(true) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled") + val COALESCE_BUCKETS_IN_JOIN_ENABLED = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled") .doc("When true, if two bucketed tables with the different number of buckets are joined, " + "the side with a bigger number of buckets will be coalesced to have the same number " + - "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " + - "and only when the bigger number of buckets is divisible by the smaller number of buckets.") + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + + "number of buckets. Bucket coalescing is applied to sort-merge joins and " + + "shuffled hash join.") .version("3.1.0") .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") + val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.") + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3269,6 +3270,11 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) + + def coalesceBucketsInJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 791e432269632..e4b9322016cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} -import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin +import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -332,7 +332,7 @@ object QueryExecution { // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( - CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf), + CoalesceBucketsInJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala new file mode 100644 index 0000000000000..16344e8ec2724 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import scala.annotation.tailrec + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin` + * if the following conditions are met: + * - Two bucketed tables are joined. + * - Join keys match with output partition expressions on their respective sides. + * - The larger bucket number is divisible by the smaller bucket number. + * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. + * - The ratio of the number of buckets is less than the value set in + * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + */ +case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def updateNumCoalescedBuckets( + join: BaseJoinExec, + numLeftBuckets: Int, + numRightBucket: Int, + numCoalescedBuckets: Int): BaseJoinExec = { + if (numCoalescedBuckets != numLeftBuckets) { + val leftCoalescedChild = join.left transformUp { + case f: FileSourceScanExec => + f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + } + join match { + case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) + case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) + } + } else { + val rightCoalescedChild = join.right transformUp { + case f: FileSourceScanExec => + f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + } + join match { + case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) + case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) + } + } + } + + private def isCoalesceSHJStreamSide( + join: ShuffledHashJoinExec, + numLeftBuckets: Int, + numRightBucket: Int, + numCoalescedBuckets: Int): Boolean = { + if (numCoalescedBuckets == numLeftBuckets) { + join.buildSide != BuildRight + } else { + join.buildSide != BuildLeft + } + } + + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.coalesceBucketsInJoinEnabled) { + return plan + } + + plan transform { + case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) + if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= + conf.coalesceBucketsInJoinMaxBucketRatio => + val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) + join match { + case j: SortMergeJoinExec => + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + case j: ShuffledHashJoinExec + // Only coalesce the buckets for shuffled hash join stream side, + // to avoid OOM for build side. + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + case other => other + } + case other => other + } + } +} + +/** + * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`, + * where both sides of the join have the bucketed tables, + * are consisted of only the scan operation, + * and numbers of buckets are not equal but divisible. + */ +object ExtractJoinWithBuckets { + @tailrec + private def isScanOperation(plan: SparkPlan): Boolean = plan match { + case f: FilterExec => isScanOperation(f.child) + case p: ProjectExec => isScanOperation(p.child) + case _: FileSourceScanExec => true + case _ => false + } + + private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { + plan.collectFirst { + case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && + f.optionalNumCoalescedBuckets.isEmpty => + f.relation.bucketSpec.get + } + } + + /** + * The join keys should match with expressions for output partitioning. Note that + * the ordering does not matter because it will be handled in `EnsureRequirements`. + */ + private def satisfiesOutputPartitioning( + keys: Seq[Expression], + partitioning: Partitioning): Boolean = { + partitioning match { + case HashPartitioning(exprs, _) if exprs.length == keys.length => + exprs.forall(e => keys.exists(_.semanticEquals(e))) + case _ => false + } + } + + private def isApplicable(j: BaseJoinExec): Boolean = { + (j.isInstanceOf[SortMergeJoinExec] || + j.isInstanceOf[ShuffledHashJoinExec]) && + isScanOperation(j.left) && + isScanOperation(j.right) && + satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) && + satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning) + } + + private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = { + val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) + // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller + // number of buckets because bucket id is calculated by modding the total number of buckets. + numBuckets1 != numBuckets2 && large % small == 0 + } + + def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = { + plan match { + case s: BaseJoinExec if isApplicable(s) => + val leftBucket = getBucketSpec(s.left) + val rightBucket = getBucketSpec(s.right) + if (leftBucket.isDefined && rightBucket.isDefined && + isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) { + Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets) + } else { + None + } + case _ => None + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala deleted file mode 100644 index 3bb0597ecd87c..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.bucketing - -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} -import org.apache.spark.sql.execution.joins.SortMergeJoinExec -import org.apache.spark.sql.internal.SQLConf - -/** - * This rule coalesces one side of the `SortMergeJoin` if the following conditions are met: - * - Two bucketed tables are joined. - * - Join keys match with output partition expressions on their respective sides. - * - The larger bucket number is divisible by the smaller bucket number. - * - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true. - * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO. - */ -case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends Rule[SparkPlan] { - private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): Option[Int] = { - assert(numBuckets1 != numBuckets2) - val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) - // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller - // number of buckets because bucket id is calculated by modding the total number of buckets. - if (large % small == 0 && - large / small <= conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) { - Some(small) - } else { - None - } - } - - private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: Int): SparkPlan = { - plan.transformUp { - case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) - } - } - - def apply(plan: SparkPlan): SparkPlan = { - if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) { - return plan - } - - plan transform { - case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, numRightBuckets) - if numLeftBuckets != numRightBuckets => - mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { numCoalescedBuckets => - if (numCoalescedBuckets != numLeftBuckets) { - smj.copy(left = updateNumCoalescedBuckets(smj.left, numCoalescedBuckets)) - } else { - smj.copy(right = updateNumCoalescedBuckets(smj.right, numCoalescedBuckets)) - } - }.getOrElse(smj) - case other => other - } - } -} - -/** - * An extractor that extracts `SortMergeJoinExec` where both sides of the join have the bucketed - * tables and are consisted of only the scan operation. - */ -object ExtractSortMergeJoinWithBuckets { - private def isScanOperation(plan: SparkPlan): Boolean = plan match { - case f: FilterExec => isScanOperation(f.child) - case p: ProjectExec => isScanOperation(p.child) - case _: FileSourceScanExec => true - case _ => false - } - - private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { - plan.collectFirst { - case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && - f.optionalNumCoalescedBuckets.isEmpty => - f.relation.bucketSpec.get - } - } - - /** - * The join keys should match with expressions for output partitioning. Note that - * the ordering does not matter because it will be handled in `EnsureRequirements`. - */ - private def satisfiesOutputPartitioning( - keys: Seq[Expression], - partitioning: Partitioning): Boolean = { - partitioning match { - case HashPartitioning(exprs, _) if exprs.length == keys.length => - exprs.forall(e => keys.exists(_.semanticEquals(e))) - case _ => false - } - } - - private def isApplicable(s: SortMergeJoinExec): Boolean = { - isScanOperation(s.left) && - isScanOperation(s.right) && - satisfiesOutputPartitioning(s.leftKeys, s.left.outputPartitioning) && - satisfiesOutputPartitioning(s.rightKeys, s.right.outputPartitioning) - } - - def unapply(plan: SparkPlan): Option[(SortMergeJoinExec, Int, Int)] = { - plan match { - case s: SortMergeJoinExec if isApplicable(s) => - val leftBucket = getBucketSpec(s.left) - val rightBucket = getBucketSpec(s.right) - if (leftBucket.isDefined && rightBucket.isDefined) { - Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets) - } else { - None - } - case _ => None - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 70303792fdf1a..e43a4147ceb63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite test("Coalesced bucket info should be a part of explain string") { withTable("t1", "t2") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") val df1 = spark.table("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala similarity index 58% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 6a70045c55e64..e4eaf1838f81e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -19,17 +19,21 @@ package org.apache.spark.sql.execution.bucketing import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.optimizer.BuildLeft +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} -class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkSession { +class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { + private val sortMergeJoin = "sortMergeJoin" + private val shuffledHashJoin = "shuffledHashJoin" + private val broadcastHashJoin = "broadcastHashJoin" + case class RelationSetting( cols: Seq[Attribute], numBuckets: Int, @@ -47,11 +51,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys: Seq[Attribute], leftRelation: RelationSetting, rightRelation: RelationSetting, - isSortMergeJoin: Boolean) + joinOperator: String, + shjBuildSide: Option[BuildSide]) object JoinSetting { - def apply(l: RelationSetting, r: RelationSetting, isSortMergeJoin: Boolean): JoinSetting = { - JoinSetting(l.cols, r.cols, l, r, isSortMergeJoin) + def apply( + l: RelationSetting, + r: RelationSetting, + joinOperator: String = sortMergeJoin, + shjBuildSide: Option[BuildSide] = None): JoinSetting = { + JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide) } } @@ -73,17 +82,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS leftRelation = setting.rightRelation, rightRelation = setting.leftRelation) - Seq(setting, swappedSetting).foreach { case s => + val settings = if (setting.joinOperator != shuffledHashJoin) { + Seq(setting, swappedSetting) + } else { + Seq(setting) + } + settings.foreach { s => val lScan = newFileSourceScanExec(s.leftRelation) val rScan = newFileSourceScanExec(s.rightRelation) - val join = if (s.isSortMergeJoin) { + val join = if (s.joinOperator == sortMergeJoin) { SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) + } else if (s.joinOperator == shuffledHashJoin) { + ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan) } else { BroadcastHashJoinExec( s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) } - val plan = CoalesceBucketsInSortMergeJoin(spark.sessionState.conf)(join) + val plan = CoalesceBucketsInJoin(spark.sessionState.conf)(join) def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { val coalesced = subPlan.collect { @@ -91,7 +107,7 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS f.optionalNumCoalescedBuckets.get } if (expected.isDefined) { - assert(coalesced.size == 1 && coalesced(0) == expected.get) + assert(coalesced.size == 1 && coalesced.head == expected.get) } else { assert(coalesced.isEmpty) } @@ -103,46 +119,67 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS } test("bucket coalescing - basic") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) + // Coalescing bucket should not happen when the target is on shuffled hash join + // build side. run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true)) + RelationSetting(4, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildRight))) } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { - run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin)) } } - test("bucket coalescing should work only for sort merge join") { + test("bucket coalescing should work only for sort merge join and shuffled hash join") { Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> enabled.toString) { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) { run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = false)) + RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin)) } } } test("bucket coalescing shouldn't be applied when the number of buckets are the same") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { - run(JoinSetting(RelationSetting(8, None), RelationSetting(8, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = sortMergeJoin)) + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) } } test("number of bucket is not divisible by other number of bucket") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { - run(JoinSetting(RelationSetting(3, None), RelationSetting(8, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = sortMergeJoin)) + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) } } test("the ratio of the number of buckets is greater than max allowed") { withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting(RelationSetting(4, None), RelationSetting(16, None), isSortMergeJoin = true)) + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) } } test("join keys should match with output partitioning") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { val lCols = Seq( AttributeReference("l1", IntegerType)(), AttributeReference("l2", IntegerType)()) @@ -160,7 +197,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = Seq(rCols.head), leftRelation = lRel, rightRelation = rRel, - isSortMergeJoin = true)) + joinOperator = sortMergeJoin, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) // The following should not be coalesced because join keys do not match with output // partitioning (more expressions). @@ -169,7 +215,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), leftRelation = lRel, rightRelation = rRel, - isSortMergeJoin = true)) + joinOperator = sortMergeJoin, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) // The following will be coalesced since ordering should not matter because it will be // adjusted in `EnsureRequirements`. @@ -178,7 +233,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = rCols.reverse, leftRelation = lRel, rightRelation = RelationSetting(rCols, 8, Some(4)), - isSortMergeJoin = true)) + joinOperator = sortMergeJoin, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = lCols.reverse, + rightKeys = rCols.reverse, + leftRelation = lRel, + rightRelation = RelationSetting(rCols, 8, Some(4)), + joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index b6767eb3132ea..98886d271e977 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,7 +876,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. val testSpec1 = BucketedTableTestSpec( @@ -911,7 +911,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { // Coalescing buckets is disabled by a config. run( BucketedTableTestSpec( @@ -921,8 +921,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing buckets is not applied because the ratio of the number of buckets (3) // is greater than max allowed (2). run( @@ -932,7 +932,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { run( // Coalescing buckets is not applied because the bigger number of buckets (8) is not // divisible by the smaller number of buckets (7). @@ -950,7 +950,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { def verify( query: String, expectedNumShuffles: Int, @@ -964,7 +964,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) - assert(scans(0).optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) } else { assert(scans.isEmpty) } From 5cf56d706d24bd995dc1a4b2635e493cd77f6aff Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 15 Jul 2020 02:32:21 -0700 Subject: [PATCH 2/7] Separate max bucket ratio for SMJ and SHJ and add OOM related documentation --- .../apache/spark/sql/internal/SQLConf.scala | 23 +++++++++++++++---- .../bucketing/CoalesceBucketsInJoin.scala | 15 +++++++----- .../CoalesceBucketsInJoinSuite.scala | 18 ++++++++------- .../spark/sql/sources/BucketedReadSuite.scala | 2 +- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 06fbb0a31c005..5561a1d0c3e46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2649,8 +2649,8 @@ object SQLConf { .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") + val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + "equal to this value for bucket coalescing to be applied. This configuration only " + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") @@ -2659,6 +2659,18 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) + val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio") + .doc("The ratio of the number of two buckets being coalesced should be less than or " + + "equal to this value for bucket coalescing to be applied. This configuration only " + + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " + + "Note as coalescing reduces parallelism, there might be a higher risk for " + + "out of memory error at shuffled hash join build side.") + .version("3.1.0") + .intConf + .checkValue(_ > 0, "The difference must be positive.") + .createWithDefault(4) + val BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT = buildConf("spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit") .internal() @@ -3272,8 +3284,11 @@ class SQLConf extends Serializable with Logging { def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) - def coalesceBucketsInJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO) + + def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 16344e8ec2724..7acad9e7b7187 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.internal.SQLConf * - The larger bucket number is divisible by the smaller bucket number. * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or, + * COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`). */ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { private def updateNumCoalescedBuckets( @@ -83,17 +84,19 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { } plan transform { - case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) - if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= - conf.coalesceBucketsInJoinMaxBucketRatio => + case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) => + val bucketRatio = math.max(numLeftBuckets, numRightBuckets) / + math.min(numLeftBuckets, numRightBuckets) val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) join match { - case j: SortMergeJoinExec => + case j: SortMergeJoinExec + if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case j: ShuffledHashJoinExec // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. - if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => + if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio && + isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case other => other } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index e4eaf1838f81e..588e74ff3adeb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -167,14 +167,16 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { } test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin)) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin, - shjBuildSide = Some(BuildLeft))) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin)) + } + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin, + shjBuildSide = Some(BuildLeft))) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 98886d271e977..7068c7ca92013 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing buckets is not applied because the ratio of the number of buckets (3) // is greater than max allowed (2). run( From cbd2fa18659f69a354de36bdf282601809fa0cf0 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 15 Jul 2020 09:47:06 -0700 Subject: [PATCH 3/7] Set smaller default config value for SHJ --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5561a1d0c3e46..94e9fae59273f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2669,7 +2669,7 @@ object SQLConf { .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") - .createWithDefault(4) + .createWithDefault(2) val BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT = buildConf("spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit") From d6c9d8826f640915f6814d1130bc6db3bed96909 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 16 Jul 2020 22:50:26 -0700 Subject: [PATCH 4/7] Address all comments beside the separate configs discussion --- .../bucketing/CoalesceBucketsInJoin.scala | 29 ++++--- .../CoalesceBucketsInJoinSuite.scala | 76 ++++++++++++------- 2 files changed, 65 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 7acad9e7b7187..607eb6fd5661f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -40,25 +40,30 @@ import org.apache.spark.sql.internal.SQLConf * COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`). */ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def updateNumCoalescedBucketsInScan( + plan: SparkPlan, + numCoalescedBuckets: Int): SparkPlan = { + plan transformUp { + case f: FileSourceScanExec => + f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + } + } + private def updateNumCoalescedBuckets( join: BaseJoinExec, numLeftBuckets: Int, numRightBucket: Int, numCoalescedBuckets: Int): BaseJoinExec = { if (numCoalescedBuckets != numLeftBuckets) { - val leftCoalescedChild = join.left transformUp { - case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) - } + val leftCoalescedChild = + updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets) join match { case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) } } else { - val rightCoalescedChild = join.right transformUp { - case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) - } + val rightCoalescedChild = + updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets) join match { case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) @@ -160,12 +165,12 @@ object ExtractJoinWithBuckets { def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = { plan match { - case s: BaseJoinExec if isApplicable(s) => - val leftBucket = getBucketSpec(s.left) - val rightBucket = getBucketSpec(s.right) + case j: BaseJoinExec if isApplicable(j) => + val leftBucket = getBucketSpec(j.left) + val rightBucket = getBucketSpec(j.right) if (leftBucket.isDefined && rightBucket.isDefined && isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) { - Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets) + Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets) } else { None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 588e74ff3adeb..317a34e5157c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { - private val sortMergeJoin = "sortMergeJoin" - private val shuffledHashJoin = "shuffledHashJoin" - private val broadcastHashJoin = "broadcastHashJoin" + private val SORT_MERGE_JOIN = "sortMergeJoin" + private val SHUFFLED_HASH_JOIN = "shuffledHashJoin" + private val BROADCAST_HASH_JOIN = "broadcastHashJoin" case class RelationSetting( cols: Seq[Attribute], @@ -58,7 +58,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { def apply( l: RelationSetting, r: RelationSetting, - joinOperator: String = sortMergeJoin, + joinOperator: String = SORT_MERGE_JOIN, shjBuildSide: Option[BuildSide] = None): JoinSetting = { JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide) } @@ -82,7 +82,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { leftRelation = setting.rightRelation, rightRelation = setting.leftRelation) - val settings = if (setting.joinOperator != shuffledHashJoin) { + val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN) { Seq(setting, swappedSetting) } else { Seq(setting) @@ -90,9 +90,9 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { settings.foreach { s => val lScan = newFileSourceScanExec(s.leftRelation) val rScan = newFileSourceScanExec(s.rightRelation) - val join = if (s.joinOperator == sortMergeJoin) { + val join = if (s.joinOperator == SORT_MERGE_JOIN) { SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) - } else if (s.joinOperator == shuffledHashJoin) { + } else if (s.joinOperator == SHUFFLED_HASH_JOIN) { ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan) } else { BroadcastHashJoinExec( @@ -121,19 +121,23 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { test("bucket coalescing - basic") { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin)) + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin, + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) - // Coalescing bucket should not happen when the target is on shuffled hash join - // build side. - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, - shjBuildSide = Some(BuildRight))) } + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin)) + RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) } } @@ -141,17 +145,25 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { Seq(true, false).foreach { enabled => withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) { run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin)) + RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) } } } + test("bucket coalescing shouldn't be applied to shuffled hash join build side") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) + } + } + test("bucket coalescing shouldn't be applied when the number of buckets are the same") { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = sortMergeJoin)) + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) } } @@ -159,9 +171,9 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { test("number of bucket is not divisible by other number of bucket") { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = sortMergeJoin)) + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = shuffledHashJoin, + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) } } @@ -170,11 +182,11 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin)) + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) } withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") { run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin, + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) } } @@ -199,7 +211,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = Seq(rCols.head), leftRelation = lRel, rightRelation = rRel, - joinOperator = sortMergeJoin, + joinOperator = SORT_MERGE_JOIN, shjBuildSide = None)) run(JoinSetting( @@ -207,7 +219,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = Seq(rCols.head), leftRelation = lRel, rightRelation = rRel, - joinOperator = shuffledHashJoin, + joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) // The following should not be coalesced because join keys do not match with output @@ -217,7 +229,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), leftRelation = lRel, rightRelation = rRel, - joinOperator = sortMergeJoin, + joinOperator = SORT_MERGE_JOIN, shjBuildSide = None)) run(JoinSetting( @@ -225,7 +237,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), leftRelation = lRel, rightRelation = rRel, - joinOperator = shuffledHashJoin, + joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) // The following will be coalesced since ordering should not matter because it will be @@ -235,7 +247,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = rCols.reverse, leftRelation = lRel, rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = sortMergeJoin, + joinOperator = SORT_MERGE_JOIN, shjBuildSide = None)) run(JoinSetting( @@ -243,8 +255,16 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { rightKeys = rCols.reverse, leftRelation = lRel, rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = shuffledHashJoin, + joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) + + run(JoinSetting( + leftKeys = rCols.reverse, + rightKeys = lCols.reverse, + leftRelation = RelationSetting(rCols, 8, Some(4)), + rightRelation = lRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) } } From 2e9aff9fd5a93a0f0529784eb7f805076818b010 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sat, 18 Jul 2020 22:22:20 -0700 Subject: [PATCH 5/7] Change back to single ratio config for SMJ and SHJ, and rebase to latest master --- .../org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++-------- .../bucketing/CoalesceBucketsInJoin.scala | 15 ++++++--------- .../bucketing/CoalesceBucketsInJoinSuite.scala | 17 +++++++---------- .../spark/sql/sources/BucketedReadSuite.scala | 2 +- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 94e9fae59273f..9ec14ab1884fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2644,13 +2644,15 @@ object SQLConf { "the side with a bigger number of buckets will be coalesced to have the same number " + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + "number of buckets. Bucket coalescing is applied to sort-merge joins and " + + "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + + "in join, but it also reduces parallelism and could possibly cause OOM for " + "shuffled hash join.") .version("3.1.0") .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") + val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + "equal to this value for bucket coalescing to be applied. This configuration only " + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") @@ -2681,7 +2683,7 @@ object SQLConf { .intConf .checkValue(_ >= 0, "The value must be non-negative.") .createWithDefault(8) - + /** * Holds information about keys that have been deprecated. * @@ -3284,11 +3286,8 @@ class SQLConf extends Serializable with Logging { def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) - def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO) - - def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO) + def coalesceBucketsInJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 607eb6fd5661f..258a1a1adc20a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -36,8 +36,7 @@ import org.apache.spark.sql.internal.SQLConf * - The larger bucket number is divisible by the smaller bucket number. * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or, - * COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`). + * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. */ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { private def updateNumCoalescedBucketsInScan( @@ -89,19 +88,17 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { } plan transform { - case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) => - val bucketRatio = math.max(numLeftBuckets, numRightBuckets) / - math.min(numLeftBuckets, numRightBuckets) + case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) + if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= + conf.coalesceBucketsInJoinMaxBucketRatio => val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) join match { - case j: SortMergeJoinExec - if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio => + case j: SortMergeJoinExec => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case j: ShuffledHashJoinExec // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. - if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio && - isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) case other => other } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 317a34e5157c6..7d59a6bdd4fd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -179,16 +179,13 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { } test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) - } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7068c7ca92013..98886d271e977 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing buckets is not applied because the ratio of the number of buckets (3) // is greater than max allowed (2). run( From 7b200498be973c2bc5016362f7e65266d56e77c7 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 21 Jul 2020 08:42:47 -0700 Subject: [PATCH 6/7] Address comments in unit test and rebase --- .../org/apache/spark/sql/internal/SQLConf.scala | 14 +------------- .../bucketing/CoalesceBucketsInJoinSuite.scala | 5 ----- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9ec14ab1884fd..c68d7ccab4d10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2661,18 +2661,6 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) - val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio") - .doc("The ratio of the number of two buckets being coalesced should be less than or " + - "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " + - "Note as coalescing reduces parallelism, there might be a higher risk for " + - "out of memory error at shuffled hash join build side.") - .version("3.1.0") - .intConf - .checkValue(_ > 0, "The difference must be positive.") - .createWithDefault(2) - val BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT = buildConf("spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit") .internal() @@ -2683,7 +2671,7 @@ object SQLConf { .intConf .checkValue(_ >= 0, "The value must be non-negative.") .createWithDefault(8) - + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 7d59a6bdd4fd6..89aee37a4246f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -128,16 +128,11 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { } withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildRight))) } } From b1a8a927f283b095716a638a26f9c5d5e9cc380c Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 21 Jul 2020 09:57:51 -0700 Subject: [PATCH 7/7] Change method name to hasScanOperation --- .../execution/bucketing/CoalesceBucketsInJoin.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 258a1a1adc20a..22f308f331449 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -115,9 +115,9 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { */ object ExtractJoinWithBuckets { @tailrec - private def isScanOperation(plan: SparkPlan): Boolean = plan match { - case f: FilterExec => isScanOperation(f.child) - case p: ProjectExec => isScanOperation(p.child) + private def hasScanOperation(plan: SparkPlan): Boolean = plan match { + case f: FilterExec => hasScanOperation(f.child) + case p: ProjectExec => hasScanOperation(p.child) case _: FileSourceScanExec => true case _ => false } @@ -147,8 +147,8 @@ object ExtractJoinWithBuckets { private def isApplicable(j: BaseJoinExec): Boolean = { (j.isInstanceOf[SortMergeJoinExec] || j.isInstanceOf[ShuffledHashJoinExec]) && - isScanOperation(j.left) && - isScanOperation(j.right) && + hasScanOperation(j.left) && + hasScanOperation(j.right) && satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) && satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning) }