diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 47cd3c7d62a72..2fa03bed97883 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2655,24 +2655,39 @@ object SQLConf { .booleanConf .createWithDefault(true) - val COALESCE_BUCKETS_IN_JOIN_ENABLED = - buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled") - .doc("When true, if two bucketed tables with the different number of buckets are joined, " + - "the side with a bigger number of buckets will be coalesced to have the same number " + - "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + - "number of buckets. Bucket coalescing is applied to sort-merge joins and " + - "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + - "in join, but it also reduces parallelism and could possibly cause OOM for " + - "shuffled hash join.") - .version("3.1.0") - .booleanConf - .createWithDefault(false) + object BucketReadStrategyInJoin extends Enumeration { + val COALESCE, REPARTITION, OFF = Value + } - val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") - .doc("The ratio of the number of two buckets being coalesced should be less than or " + - "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") + val BUCKET_READ_STRATEGY_IN_JOIN = + buildConf("spark.sql.sources.bucketing.readStrategyInJoin") + .doc("The bucket read strategy can be set to one of " + + BucketReadStrategyInJoin.values.mkString(", ") + + s". When set to ${BucketReadStrategyInJoin.COALESCE}, if two bucketed tables with " + + "different number of buckets are joined, the side with a bigger number of buckets will " + + "be coalesced to have the same number of buckets as the smaller side. When set to " + + s"${BucketReadStrategyInJoin.REPARTITION}, the side with a smaller number of buckets " + + "will be repartitioned to have the same number of buckets as the bigger side. For either " + + "coalescing or repartitioning to be applied, The bigger number of buckets must be " + + "divisible by the smaller number of buckets, and the strategy is applied to sort-merge " + + s"joins and shuffled hash joins. By default, the read strategy is set to " + + s"${BucketReadStrategyInJoin.OFF}, and neither coalescing nor reparitioning is applied. " + + "Note: Coalescing bucketed table can avoid unnecessary shuffle in join, but it also " + + "reduces parallelism and could possibly cause OOM for shuffled hash join. Repartitioning " + + "bucketed table avoids unnecessary shuffle in join while maintaining the parallelism " + + "at the cost of reading duplicate data.") + .version("3.1.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(BucketReadStrategyInJoin.values.map(_.toString)) + .createWithDefault(BucketReadStrategyInJoin.OFF.toString) + + val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio") + .doc("The ratio of the number of two buckets being coalesced/repartitioned should be " + + "less than or equal to this value for bucket coalescing/repartitioning to be applied. " + + s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_IN_JOIN.key}' " + + s"is set to a strategy other than '${BucketReadStrategyInJoin.OFF}'.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3325,16 +3340,17 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) - def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) - - def coalesceBucketsInJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + def bucketReadStrategyInJoinMaxBucketRatio: Int = + getConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO) def optimizeNullAwareAntiJoin: Boolean = getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN) def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) + def bucketReadStrategyInJoin: BucketReadStrategyInJoin.Value = + BucketReadStrategyInJoin.withName(getConf(BUCKET_READ_STRATEGY_IN_JOIN)) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2e7b6fe5f923d..91158272360a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.bucketing.BucketRepartitioningRDD import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -154,7 +155,7 @@ case class RowDataSourceScanExec( * @param requiredSchema Required schema of the underlying relation, excluding partition columns. * @param partitionFilters Predicates to use for partition pruning. * @param optionalBucketSet Bucket ids for bucket pruning. - * @param optionalNumCoalescedBuckets Number of coalesced buckets. + * @param optionalNewNumBuckets Number of buckets to coalesce or repartition. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. */ @@ -164,7 +165,7 @@ case class FileSourceScanExec( requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], - optionalNumCoalescedBuckets: Option[Int], + optionalNewNumBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { @@ -172,7 +173,15 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + // `RepartitioningBucketRDD` converts columnar batches to rows to calculate bucket id for each + // row, thus columnar is not supported when `RepartitioningBucketRDD` is used to avoid + // conversions from batches to rows and back to batches. + relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets + } + + @transient private lazy val isRepartitioningBuckets: Boolean = { + bucketedScan && optionalNewNumBuckets.isDefined && + optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets } private lazy val needsUnsafeRowConversion: Boolean = { @@ -292,7 +301,7 @@ case class FileSourceScanExec( // above val spec = relation.bucketSpec.get val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) - val numPartitions = optionalNumCoalescedBuckets.getOrElse(spec.numBuckets) + val numPartitions = optionalNewNumBuckets.getOrElse(spec.numBuckets) val partitioning = HashPartitioning(bucketColumns, numPartitions) val sortColumns = spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) @@ -314,7 +323,7 @@ case class FileSourceScanExec( val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. - if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { + if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || isRepartitioningBuckets)) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -360,7 +369,9 @@ case class FileSourceScanExec( } metadata + ("SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" + - optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) + optionalNewNumBuckets.map { b => + if (b > spec.numBuckets) s" (Repartitioned to $b)" else s" (Coalesced to $b)" + }.getOrElse(""))) } getOrElse { metadata } @@ -548,22 +559,46 @@ case class FileSourceScanExec( filesGroupedToBuckets } - val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => - logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") - val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) - Seq.tabulate(numCoalescedBuckets) { bucketId => - val partitionedFiles = coalescedBuckets.get(bucketId).map { - _.values.flatten.toArray - }.getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) - } - }.getOrElse { - Seq.tabulate(bucketSpec.numBuckets) { bucketId => + if (optionalNewNumBuckets.isEmpty) { + val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } else { + val newNumBuckets = optionalNewNumBuckets.get + if (newNumBuckets < bucketSpec.numBuckets) { + assert(bucketSpec.numBuckets % newNumBuckets == 0) + logInfo(s"Coalescing to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") + val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) + val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => + val partitionedFiles = coalescedBuckets + .get(bucketId) + .map(_.values.flatten.toArray) + .getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } else { + assert(newNumBuckets % bucketSpec.numBuckets == 0) + logInfo(s"Repartitioning to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") + val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => + FilePartition( + bucketId, + prunedFilesGroupedToBuckets.getOrElse(bucketId % bucketSpec.numBuckets, Array.empty)) + } + // There are now more files to be read. + val filesNum = filePartitions.map(_.files.size.toLong).sum + val filesSize = filePartitions.map(_.files.map(_.length).sum).sum + driverMetrics("numFiles") = filesNum + driverMetrics("filesSize") = filesSize + new BucketRepartitioningRDD( + fsRelation.sparkSession, + readFile, + filePartitions, + outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression, + output) + } } - - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } /** @@ -622,7 +657,7 @@ case class FileSourceScanExec( QueryPlan.normalizePredicates( filterUnusedDynamicPruningExpressions(partitionFilters), output), optionalBucketSet, - optionalNumCoalescedBuckets, + optionalNewNumBuckets, QueryPlan.normalizePredicates(dataFilters, output), None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index ace2a11ddaa92..3f16dfade48a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} -import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin +import org.apache.spark.sql.execution.bucketing.CoalesceOrRepartitionBucketsInJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -339,7 +339,7 @@ object QueryExecution { // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( - CoalesceBucketsInJoin(sparkSession.sessionState.conf), + CoalesceOrRepartitionBucketsInJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), RemoveRedundantProjects(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala new file mode 100644 index 0000000000000..71f9072b64c4b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import scala.collection.JavaConverters._ + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection} +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * An RDD that filters out the rows that do not belong to the current bucket file being read. + */ +private[spark] class BucketRepartitioningRDD( + @transient private val sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient override val filePartitions: Seq[FilePartition], + bucketIdExpression: Expression, + output: Seq[Attribute]) + extends FileScanRDD(sparkSession, readFunction, filePartitions) { + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iter: Iterator[_] = super.compute(split, context) + iter.flatMap { + case row: InternalRow => Seq(row) + case batch: ColumnarBatch => batch.rowIterator().asScala + }.filter(getBucketId(_) == split.index) + } + + private lazy val getBucketId: InternalRow => Int = { + val projection = UnsafeProjection.create(Seq(bucketIdExpression), output) + row => projection(row).getInt(0) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala similarity index 65% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index 22f308f331449..01508f80471ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -27,45 +27,48 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyInJoin /** - * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin` + * This rule coalesces or repartitions one side of the `SortMergeJoin` and `ShuffledHashJoin` * if the following conditions are met: * - Two bucketed tables are joined. * - Join keys match with output partition expressions on their respective sides. * - The larger bucket number is divisible by the smaller bucket number. - * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + * BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO. + * + * The bucketed table with a larger number of buckets is coalesced if BUCKET_READ_STRATEGY_IN_JOIN + * is set to BucketReadStrategyInJoin.COALESCE, whereas the bucketed table with a smaller number of + * buckets is repartitioned if BUCKET_READ_STRATEGY_IN_JOIN is set to + * BucketReadStrategyInJoin.REPARTITION. */ -case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { - private def updateNumCoalescedBucketsInScan( +case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def updateNumBucketsInScan( plan: SparkPlan, - numCoalescedBuckets: Int): SparkPlan = { + newNumBuckets: Int): SparkPlan = { plan transformUp { case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + f.copy(optionalNewNumBuckets = Some(newNumBuckets)) } } - private def updateNumCoalescedBuckets( + private def updateNumBuckets( join: BaseJoinExec, numLeftBuckets: Int, numRightBucket: Int, - numCoalescedBuckets: Int): BaseJoinExec = { - if (numCoalescedBuckets != numLeftBuckets) { - val leftCoalescedChild = - updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets) + newNumBuckets: Int): BaseJoinExec = { + if (newNumBuckets != numLeftBuckets) { + val leftChild = updateNumBucketsInScan(join.left, newNumBuckets) join match { - case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) - case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) + case j: SortMergeJoinExec => j.copy(left = leftChild) + case j: ShuffledHashJoinExec => j.copy(left = leftChild) } } else { - val rightCoalescedChild = - updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets) + val rightChild = updateNumBucketsInScan(join.right, newNumBuckets) join match { - case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) - case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) + case j: SortMergeJoinExec => j.copy(right = rightChild) + case j: ShuffledHashJoinExec => j.copy(right = rightChild) } } } @@ -83,23 +86,34 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { } def apply(plan: SparkPlan): SparkPlan = { - if (!conf.coalesceBucketsInJoinEnabled) { + val bucketReadStrategy = conf.bucketReadStrategyInJoin + if (bucketReadStrategy == BucketReadStrategyInJoin.OFF) { return plan } plan transform { case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= - conf.coalesceBucketsInJoinMaxBucketRatio => - val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) + conf.bucketReadStrategyInJoinMaxBucketRatio => + val newNumBuckets = if (bucketReadStrategy == BucketReadStrategyInJoin.COALESCE) { + math.min(numLeftBuckets, numRightBuckets) + } else { + math.max(numLeftBuckets, numRightBuckets) + } join match { case j: SortMergeJoinExec => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) - case j: ShuffledHashJoinExec - // Only coalesce the buckets for shuffled hash join stream side, - // to avoid OOM for build side. - if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case j: ShuffledHashJoinExec => + bucketReadStrategy match { + case BucketReadStrategyInJoin.REPARTITION => + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case BucketReadStrategyInJoin.COALESCE + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets) => + // Only coalesce the buckets for shuffled hash join stream side, + // to avoid OOM for build side. + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case _ => j + } case other => other } case other => other @@ -125,7 +139,7 @@ object ExtractJoinWithBuckets { private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { plan.collectFirst { case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && - f.optionalNumCoalescedBuckets.isEmpty => + f.optionalNewNumBuckets.isEmpty => f.relation.bucketSpec.get } } @@ -155,8 +169,9 @@ object ExtractJoinWithBuckets { private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = { val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) - // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller - // number of buckets because bucket id is calculated by modding the total number of buckets. + // A bucket can be coalesced or repartitioned only if the bigger number of buckets is divisible + // by the smaller number of buckets because bucket id is calculated by modding the total number + // of buckets. numBuckets1 != numBuckets2 && large % small == 0 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index e43a4147ceb63..4621406e862a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -344,19 +344,24 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } - test("Coalesced bucket info should be a part of explain string") { - withTable("t1", "t2") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") - Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") - val df1 = spark.table("t1") - val df2 = spark.table("t2") - val joined = df1.join(df2, df1("i") === df2("i")) - checkKeywordsExistsInExplain( - joined, - SimpleMode, - "SelectedBucketsCount: 8 out of 8 (Coalesced to 4)" :: Nil: _*) + test("Coalesced/repartitioned bucket info should be a part of explain string") { + Seq((SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + "8 out of 8 (Coalesced to 4)"), + (SQLConf.BucketReadStrategyInJoin.REPARTITION.toString, + "4 out of 4 (Repartitioned to 8)")).foreach { case (strategy, expected) => + withTable("t1", "t2") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") + Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") + val df1 = spark.table("t1") + val df2 = spark.table("t2") + val joined = df1.join(df2, df1("i") === df2("i")) + checkKeywordsExistsInExplain( + joined, + SimpleMode, + s"SelectedBucketsCount: $expected" :: Nil: _*) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala deleted file mode 100644 index 89aee37a4246f..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.bucketing - -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} -import org.apache.spark.sql.types.{IntegerType, StructType} - -class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { - private val SORT_MERGE_JOIN = "sortMergeJoin" - private val SHUFFLED_HASH_JOIN = "shuffledHashJoin" - private val BROADCAST_HASH_JOIN = "broadcastHashJoin" - - case class RelationSetting( - cols: Seq[Attribute], - numBuckets: Int, - expectedCoalescedNumBuckets: Option[Int]) - - object RelationSetting { - def apply(numBuckets: Int, expectedCoalescedNumBuckets: Option[Int]): RelationSetting = { - val cols = Seq(AttributeReference("i", IntegerType)()) - RelationSetting(cols, numBuckets, expectedCoalescedNumBuckets) - } - } - - case class JoinSetting( - leftKeys: Seq[Attribute], - rightKeys: Seq[Attribute], - leftRelation: RelationSetting, - rightRelation: RelationSetting, - joinOperator: String, - shjBuildSide: Option[BuildSide]) - - object JoinSetting { - def apply( - l: RelationSetting, - r: RelationSetting, - joinOperator: String = SORT_MERGE_JOIN, - shjBuildSide: Option[BuildSide] = None): JoinSetting = { - JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide) - } - } - - private def newFileSourceScanExec(setting: RelationSetting): FileSourceScanExec = { - val relation = HadoopFsRelation( - location = new InMemoryFileIndex(spark, Nil, Map.empty, None), - partitionSchema = PartitionSpec.emptySpec.partitionColumns, - dataSchema = StructType.fromAttributes(setting.cols), - bucketSpec = Some(BucketSpec(setting.numBuckets, setting.cols.map(_.name), Nil)), - fileFormat = new ParquetFileFormat(), - options = Map.empty)(spark) - FileSourceScanExec(relation, setting.cols, relation.dataSchema, Nil, None, None, Nil, None) - } - - private def run(setting: JoinSetting): Unit = { - val swappedSetting = setting.copy( - leftKeys = setting.rightKeys, - rightKeys = setting.leftKeys, - leftRelation = setting.rightRelation, - rightRelation = setting.leftRelation) - - val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN) { - Seq(setting, swappedSetting) - } else { - Seq(setting) - } - settings.foreach { s => - val lScan = newFileSourceScanExec(s.leftRelation) - val rScan = newFileSourceScanExec(s.rightRelation) - val join = if (s.joinOperator == SORT_MERGE_JOIN) { - SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) - } else if (s.joinOperator == SHUFFLED_HASH_JOIN) { - ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan) - } else { - BroadcastHashJoinExec( - s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) - } - - val plan = CoalesceBucketsInJoin(spark.sessionState.conf)(join) - - def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { - val coalesced = subPlan.collect { - case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.nonEmpty => - f.optionalNumCoalescedBuckets.get - } - if (expected.isDefined) { - assert(coalesced.size == 1 && coalesced.head == expected.get) - } else { - assert(coalesced.isEmpty) - } - } - - verify(s.leftRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].left) - verify(s.rightRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].right) - } - } - - test("bucket coalescing - basic") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } - - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } - } - - test("bucket coalescing should work only for sort merge join and shuffled hash join") { - Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) - } - } - } - - test("bucket coalescing shouldn't be applied to shuffled hash join build side") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildRight))) - } - } - - test("bucket coalescing shouldn't be applied when the number of buckets are the same") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } - } - - test("number of bucket is not divisible by other number of bucket") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } - } - - test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - } - } - - test("join keys should match with output partitioning") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - val lCols = Seq( - AttributeReference("l1", IntegerType)(), - AttributeReference("l2", IntegerType)()) - val rCols = Seq( - AttributeReference("r1", IntegerType)(), - AttributeReference("r2", IntegerType)()) - - val lRel = RelationSetting(lCols, 4, None) - val rRel = RelationSetting(rCols, 8, None) - - // The following should not be coalesced because join keys do not match with output - // partitioning (missing one expression). - run(JoinSetting( - leftKeys = Seq(lCols.head), - rightKeys = Seq(rCols.head), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) - - run(JoinSetting( - leftKeys = Seq(lCols.head), - rightKeys = Seq(rCols.head), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - - // The following should not be coalesced because join keys do not match with output - // partitioning (more expressions). - run(JoinSetting( - leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), - rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) - - run(JoinSetting( - leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), - rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - - // The following will be coalesced since ordering should not matter because it will be - // adjusted in `EnsureRequirements`. - run(JoinSetting( - leftKeys = lCols.reverse, - rightKeys = rCols.reverse, - leftRelation = lRel, - rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) - - run(JoinSetting( - leftKeys = lCols.reverse, - rightKeys = rCols.reverse, - leftRelation = lRel, - rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) - - run(JoinSetting( - leftKeys = rCols.reverse, - rightKeys = lCols.reverse, - leftRelation = RelationSetting(rCols, 8, Some(4)), - rightRelation = lRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildRight))) - } - } - - test("FileSourceScanExec's metadata should be updated with coalesced info") { - val scan = newFileSourceScanExec(RelationSetting(8, None)) - val value = scan.metadata("SelectedBucketsCount") - assert(value === "8 out of 8") - - val scanWithCoalescing = scan.copy(optionalNumCoalescedBuckets = Some(4)) - val valueWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") - assert(valueWithCoalescing == "8 out of 8 (Coalesced to 4)") - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala new file mode 100644 index 0000000000000..d1fca669c7c1e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StructType} + +class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { + private val SORT_MERGE_JOIN = "sortMergeJoin" + private val SHUFFLED_HASH_JOIN = "shuffledHashJoin" + private val BROADCAST_HASH_JOIN = "broadcastHashJoin" + + case class RelationSetting( + cols: Seq[Attribute], + numBuckets: Int, + expectedNumBuckets: Option[Int]) + + object RelationSetting { + def apply(numBuckets: Int, expectedNumBuckets: Option[Int]): RelationSetting = { + val cols = Seq(AttributeReference("i", IntegerType)()) + RelationSetting(cols, numBuckets, expectedNumBuckets) + } + } + + case class JoinSetting( + leftKeys: Seq[Attribute], + rightKeys: Seq[Attribute], + leftRelation: RelationSetting, + rightRelation: RelationSetting, + joinOperator: String, + shjBuildSide: Option[BuildSide]) + + object JoinSetting { + def apply( + l: RelationSetting, + r: RelationSetting, + joinOperator: String = SORT_MERGE_JOIN, + shjBuildSide: Option[BuildSide] = None): JoinSetting = { + JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide) + } + } + + private def newFileSourceScanExec(setting: RelationSetting): FileSourceScanExec = { + val relation = HadoopFsRelation( + location = new InMemoryFileIndex(spark, Nil, Map.empty, None), + partitionSchema = PartitionSpec.emptySpec.partitionColumns, + dataSchema = StructType.fromAttributes(setting.cols), + bucketSpec = Some(BucketSpec(setting.numBuckets, setting.cols.map(_.name), Nil)), + fileFormat = new ParquetFileFormat(), + options = Map.empty)(spark) + FileSourceScanExec(relation, setting.cols, relation.dataSchema, Nil, None, None, Nil, None) + } + + private def run(setting: JoinSetting): Unit = { + val swappedSetting = setting.copy( + leftKeys = setting.rightKeys, + rightKeys = setting.leftKeys, + leftRelation = setting.rightRelation, + rightRelation = setting.leftRelation, + shjBuildSide = setting.shjBuildSide.map { + case BuildLeft => BuildRight + case BuildRight => BuildLeft + }) + + val settings = Seq(setting, swappedSetting) + + settings.foreach { s => + val lScan = newFileSourceScanExec(s.leftRelation) + val rScan = newFileSourceScanExec(s.rightRelation) + val join = if (s.joinOperator == SORT_MERGE_JOIN) { + SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) + } else if (s.joinOperator == SHUFFLED_HASH_JOIN) { + ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan) + } else { + BroadcastHashJoinExec( + s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) + } + + val plan = CoalesceOrRepartitionBucketsInJoin(spark.sessionState.conf)(join) + + def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { + val optionalNewNumBuckets = subPlan.collect { + case f: FileSourceScanExec if f.optionalNewNumBuckets.nonEmpty => + f.optionalNewNumBuckets.get + } + if (expected.isDefined) { + assert(optionalNewNumBuckets.size == 1 && optionalNewNumBuckets.head == expected.get) + } else { + assert(optionalNewNumBuckets.isEmpty) + } + } + + verify(s.leftRelation.expectedNumBuckets, plan.asInstanceOf[BinaryExecNode].left) + verify(s.rightRelation.expectedNumBuckets, plan.asInstanceOf[BinaryExecNode].right) + } + } + + test("bucket coalescing - basic") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + + test("bucket repartitioning - basic") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(buildSide))) + } + } + + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, None), joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(buildSide))) + } + } + } + + test("bucket coalesce/repartition should work only for sort merge join and shuffled hash join") { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) + } + } + } + + test("bucket coalescing shouldn't be applied to shuffled hash join build side") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) + } + } + + test("bucket coalesce/repartition shouldn't be applied when the number of buckets are the same") { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + } + + test("number of bucket is not divisible by other number of bucket") { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + } + + test("the ratio of the number of buckets is greater than max allowed") { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy, + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + } + + test("join keys should match with output partitioning") { + val lCols = Seq( + AttributeReference("l1", IntegerType)(), + AttributeReference("l2", IntegerType)()) + val rCols = Seq( + AttributeReference("r1", IntegerType)(), + AttributeReference("r2", IntegerType)()) + + val lRel = RelationSetting(lCols, 4, None) + val rRel = RelationSetting(rCols, 8, None) + + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + // The following should not be coalesced because join keys do not match with output + // partitioning (missing one expression). + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + + // The following should not be coalesced because join keys do not match with output + // partitioning (more expressions). + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { + // The following will be coalesced since ordering should not matter because it will be + // adjusted in `EnsureRequirements`. + val setting = JoinSetting( + leftKeys = lCols.reverse, + rightKeys = rCols.reverse, + leftRelation = lRel, + rightRelation = RelationSetting(rCols, 8, Some(4)), + joinOperator = "", + shjBuildSide = None) + + run(setting.copy(joinOperator = SORT_MERGE_JOIN)) + run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) + } + + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { + // The following will be repartitioned since ordering should not matter because it will be + // adjusted in `EnsureRequirements`. + val setting = JoinSetting( + leftKeys = lCols.reverse, + rightKeys = rCols.reverse, + leftRelation = RelationSetting(lCols, 4, Some(8)), + rightRelation = rRel, + joinOperator = "", + shjBuildSide = None) + + run(setting.copy(joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(buildSide))) + } + } + } + + test("FileSourceScanExec's metadata should be updated with coalesced/repartitioned info") { + val scan = newFileSourceScanExec(RelationSetting(8, None)) + val value = scan.metadata("SelectedBucketsCount") + assert(value === "8 out of 8") + + val scanWithCoalescing = scan.copy(optionalNewNumBuckets = Some(4)) + val metadataWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") + assert(metadataWithCoalescing == "8 out of 8 (Coalesced to 4)") + + val scanWithRepartitioning = scan.copy(optionalNewNumBuckets = Some(16)) + val metadataWithRepartitioning = scanWithRepartitioning.metadata("SelectedBucketsCount") + assert(metadataWithRepartitioning == "8 out of 8 (Repartitioned to 16)") + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 98886d271e977..fbe547f37cde2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,7 +876,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. val testSpec1 = BucketedTableTestSpec( @@ -901,7 +902,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("bucket coalescing is not satisfied") { + test("bucket coalescing/repartitioning is not satisfied") { def run(testSpec1: BucketedTableTestSpec, testSpec2: BucketedTableTestSpec): Unit = { Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => testBucketing( @@ -911,74 +912,140 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { - // Coalescing buckets is disabled by a config. + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { + // Bucket read strategy is off. run( BucketedTableTestSpec( - Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), BucketedTableTestSpec( - Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) } - withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { - // Coalescing buckets is not applied because the ratio of the number of buckets (3) - // is greater than max allowed (2). - run( - BucketedTableTestSpec( - Some(BucketSpec(12, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf( + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy, + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + // Coalescing/repartitioning buckets is not applied because the ratio of + // the number of buckets (3) is greater than max allowed (2). + run( + BucketedTableTestSpec( + Some(BucketSpec(12, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) + } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run( - // Coalescing buckets is not applied because the bigger number of buckets (8) is not - // divisible by the smaller number of buckets (7). - BucketedTableTestSpec( - Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(7, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { + run( + // Coalescing/repartitioning buckets is not applied because the bigger number of + // buckets (8) is not divisible by the smaller number of buckets (7). + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(7, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) + } } } - test("bucket coalescing is applied when join expressions match with partitioning expressions") { + test("bucket coalesce/repartition is applied when join expressions match with partitioning " + + "expressions") { withTable("t1", "t2") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - def verify( - query: String, - expectedNumShuffles: Int, - expectedCoalescedNumBuckets: Option[Int]): Unit = { - val plan = sql(query).queryExecution.executedPlan - val shuffles = plan.collect { case s: ShuffleExchangeExec => s } - assert(shuffles.length == expectedNumShuffles) - - val scans = plan.collect { - case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f - } - if (expectedCoalescedNumBuckets.isDefined) { - assert(scans.length == 1) - assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) - } else { - assert(scans.isEmpty) + Seq((SQLConf.BucketReadStrategyInJoin.COALESCE, 4), + (SQLConf.BucketReadStrategyInJoin.REPARTITION, 8)).foreach { case (strategy, numBuckets) => + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy.toString) { + def verify( + query: String, + expectedNumShuffles: Int, + expectedNumBuckets: Option[Int]): Unit = { + val plan = sql(query).queryExecution.executedPlan + val shuffles = plan.collect { case s: ShuffleExchangeExec => s } + assert(shuffles.length == expectedNumShuffles) + + val scans = plan.collect { + case f: FileSourceScanExec if f.optionalNewNumBuckets.isDefined => f + } + if (expectedNumBuckets.isDefined) { + assert(scans.length == 1) + assert(scans.head.optionalNewNumBuckets == expectedNumBuckets) + } else { + assert(scans.isEmpty) + } } + + // Bucket strategy applied since join expressions match with the bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, Some(numBuckets)) + // Bucket strategy applied when columns are aliased. + verify( + "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x AND t1.j = y", + 0, + Some(numBuckets)) + // Bucket strategy is not applied when join expressions do not match with bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) } + } + } + } - // Coalescing applied since join expressions match with the bucket columns. - verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, Some(4)) - // Coalescing applied when columns are aliased. - verify( - "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x AND t1.j = y", - 0, - Some(4)) - // Coalescing is not applied when join expressions do not match with bucket columns. - verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) + test("bucket repartitioning eliminates shuffle") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { + // The side with testSpec2 will be repartitioned to have 8 output partitions. + val testSpec1 = BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false, + expectedNumOutputPartitions = Some(8)) + val testSpec2 = BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false, + expectedNumOutputPartitions = Some(8)) + + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } + } + } + + test("bucket repartitioning should work with wholestage codegen enabled") { + withTable("t1", "t2") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") + + val t1 = spark.table("t1") + val t2 = spark.table("t2") + val expected = t1.join(t2, t1("i") === t2("i")).collect + + withSQLConf( + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString, + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + val t1 = spark.table("t1") + val t2 = spark.table("t2") + val actual = t1.join(t2, t1("i") === t2("i")) + val plan = actual.queryExecution.executedPlan + assert(plan.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) + checkAnswer(actual, expected) } } }