From c2c7a595078f3e515a04b513933b413238d7a143 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 12 Aug 2020 17:09:23 -0700 Subject: [PATCH 01/10] initial checkin --- .../apache/spark/sql/internal/SQLConf.scala | 15 +++++ .../sql/execution/DataSourceScanExec.scala | 45 ++++++++++---- .../bucketing/CoalesceBucketsInJoin.scala | 14 +++-- .../execution/bucketing/SplitBucketRDD.scala | 59 +++++++++++++++++++ .../spark/sql/sources/BucketedReadSuite.scala | 32 ++++++++++ 5 files changed, 147 insertions(+), 18 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 57fc1bd99be28..03d1f78307592 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2665,6 +2665,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val SPLIT_BUCKETS_IN_JOIN_ENABLED = + buildConf("spark.sql.bucketing.splitBucketsInJoin.enabled") + .doc("When true, if two bucketed tables with the different number of buckets are joined, " + + "the side with a bigger number of buckets will be coalesced to have the same number " + + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + + "number of buckets. Bucket coalescing is applied to sort-merge joins and " + + "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + + "in join, but it also reduces parallelism and could possibly cause OOM for " + + "shuffled hash join.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + @@ -3311,6 +3324,8 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + def splitBucketsInJoinEnabled: Boolean = getConf(SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED) + def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) def coalesceBucketsInJoinMaxBucketRatio: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bef9f4b46c628..bb8e15395c318 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.bucketing.SplitBucketRDD import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -549,22 +550,40 @@ case class FileSourceScanExec( filesGroupedToBuckets } - val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => - logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") - val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) - Seq.tabulate(numCoalescedBuckets) { bucketId => - val partitionedFiles = coalescedBuckets.get(bucketId).map { - _.values.flatten.toArray - }.getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) - } - }.getOrElse { - Seq.tabulate(bucketSpec.numBuckets) { bucketId => + if (optionalNumCoalescedBuckets.isEmpty) { + val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } else { + val newNumBuckets = optionalNumCoalescedBuckets.get + if (newNumBuckets < bucketSpec.numBuckets) { + logInfo(s"Coalescing to $newNumBuckets buckets") + val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) + val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => + val partitionedFiles = coalescedBuckets + .get(bucketId) + .map(_.values.flatten.toArray) + .getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } else { + logInfo(s"Splitting to $newNumBuckets buckets") + val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => + FilePartition( + bucketId, + prunedFilesGroupedToBuckets.getOrElse(bucketId % bucketSpec.numBuckets, Array.empty)) + } + // There will be more files read. + val filesNum = filePartitions.map(_.files.size.toLong).sum + val filesSize = filePartitions.map(_.files.map(_.length).sum).sum + driverMetrics("numFiles") = filesNum + driverMetrics("filesSize") = filesSize + new SplitBucketRDD( + fsRelation.sparkSession, readFile, filePartitions, bucketSpec, newNumBuckets, output) + } } - - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 22f308f331449..f1fbbb87bcc6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -83,7 +83,7 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { } def apply(plan: SparkPlan): SparkPlan = { - if (!conf.coalesceBucketsInJoinEnabled) { + if (!conf.coalesceBucketsInJoinEnabled && !conf.splitBucketsInJoinEnabled) { return plan } @@ -91,15 +91,19 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= conf.coalesceBucketsInJoinMaxBucketRatio => - val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) + val newNumBuckets = if (conf.coalesceBucketsInJoinEnabled) { + math.min(numLeftBuckets, numRightBuckets) + } else { + math.max(numLeftBuckets, numRightBuckets) + } join match { case j: SortMergeJoinExec => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) case j: ShuffledHashJoinExec // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. - if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets) => + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) case other => other } case other => other diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala new file mode 100644 index 0000000000000..84b275b858ff9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.vectorized.ColumnarBatch + +private[spark] class SplitBucketRDD( + @transient private val sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient override val filePartitions: Seq[FilePartition], + bucketSpec: BucketSpec, + newNumBuckets: Int, + output: Seq[Attribute]) + extends FileScanRDD(sparkSession, readFunction, filePartitions) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iter: Iterator[_] = super.compute(split, context) + iter.map { + case batch: ColumnarBatch => + batch.rowIterator().next() + case other => other + }.filter { + case r: InternalRow => + getBucketId(r) == split.index + case _ => false + }.asInstanceOf[Iterator[InternalRow]] + } + + private lazy val getBucketId: InternalRow => Int = { + val bucketIdExpression = { + val bucketColumns = bucketSpec.bucketColumnNames.map(c => output.find(_.name == c).get) + HashPartitioning(bucketColumns, newNumBuckets).partitionIdExpression + } + + val projection = UnsafeProjection.create(Seq(bucketIdExpression), output) + row => projection(row).getInt(0) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 98886d271e977..c4eedfda289db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -982,4 +982,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } } + + test("split shuffle") { + withTable("t1", "t2") { + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") + df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") + // scalastyle:off println + withSQLConf( + SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val t1 = spark.table("t1") + val t2 = spark.table("t2") + val joined = t1.join(t2, t1("i") === t2("i"), "right") + joined.explain + println(joined.count) + val expected = joined.collect() + } + + withSQLConf(SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val t1 = spark.table("t1") + val t2 = spark.table("t2") + val joined2 = t1.join(t2, t1("i") === t2("i"), "right") + joined2.explain + joined2.show + println(joined2.count) + // checkAnswer(joined2, expected) + } + // scalastyle:on println + } + } } From 5d8390c2543e7fe5172ef854ef1dc4753d5c07cc Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 14 Aug 2020 13:43:23 -0700 Subject: [PATCH 02/10] columnar batch support --- .../execution/bucketing/SplitBucketRDD.scala | 37 +++++++++++++++++-- .../spark/sql/sources/BucketedReadSuite.scala | 24 ++++++------ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala index 84b275b858ff9..4c9806b4a2591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala @@ -23,7 +23,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.RowToColumnConverter import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch private[spark] class SplitBucketRDD( @@ -34,16 +37,44 @@ private[spark] class SplitBucketRDD( newNumBuckets: Int, output: Seq[Attribute]) extends FileScanRDD(sparkSession, readFunction, filePartitions) { + + private val numRows: Int = sparkSession.sessionState.conf.columnBatchSize + private val useOffHeap: Boolean = sparkSession.sessionState.conf.offHeapColumnVectorEnabled + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val schema = StructType.fromAttributes(output) + val converters = new RowToColumnConverter(schema) + val vectors = if (useOffHeap) { + OffHeapColumnVector.allocateColumns(numRows, schema) + } else { + OnHeapColumnVector.allocateColumns(numRows, schema) + } + val columnarBatch = new ColumnarBatch(vectors.toArray) + context.addTaskCompletionListener[Unit] { _ => + columnarBatch.close() + } + val iter: Iterator[_] = super.compute(split, context) iter.map { + case row: InternalRow => row case batch: ColumnarBatch => - batch.rowIterator().next() - case other => other + val rowIterator = batch.rowIterator() + columnarBatch.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowIterator.hasNext) { + val row = rowIterator.next() + if (getBucketId(row) == split.index) { + converters.convert(row, vectors.toArray) + rowCount += 1 + } + } + columnarBatch.setNumRows(rowCount) + columnarBatch }.filter { case r: InternalRow => getBucketId(r) == split.index - case _ => false + case _: ColumnarBatch => true }.asInstanceOf[Iterator[InternalRow]] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index c4eedfda289db..3e520c0bfb059 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -988,24 +988,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") // scalastyle:off println - withSQLConf( - SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "false", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val t1 = spark.table("t1") - val t2 = spark.table("t2") - val joined = t1.join(t2, t1("i") === t2("i"), "right") - joined.explain - println(joined.count) - val expected = joined.collect() - } +// withSQLConf( +// SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "false", +// SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", +// SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { +// val t1 = spark.table("t1") +// val t2 = spark.table("t2") +// val joined = t1.join(t2, t1("i") === t2("i"), "right") +// joined.explain +// println(joined.count) +// val expected = joined.collect() +// } withSQLConf(SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val t1 = spark.table("t1") val t2 = spark.table("t2") - val joined2 = t1.join(t2, t1("i") === t2("i"), "right") + val joined2 = t1.join(t2, t1("i") === t2("i")) joined2.explain joined2.show println(joined2.count) From 404020b01e8155d7921ce0f95e4b8b1ad2425cba Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 14 Aug 2020 14:45:47 -0700 Subject: [PATCH 03/10] disable columnar to row --- .../sql/execution/DataSourceScanExec.scala | 7 ++- .../execution/bucketing/SplitBucketRDD.scala | 46 +++---------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bb8e15395c318..ed5afa474e2c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -175,7 +175,12 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) + val isSplittingBucket = relation.bucketSpec.isDefined && + optionalNumCoalescedBuckets.isDefined && + optionalNumCoalescedBuckets.get > relation.bucketSpec.get.numBuckets + + relation.fileFormat.supportBatch(relation.sparkSession, schema) && + !isSplittingBucket } private lazy val needsUnsafeRowConversion: Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala index 4c9806b4a2591..50ba197e9923a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.execution.bucketing +import scala.collection.JavaConverters._ + import org.apache.spark.{Partition, TaskContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.RowToColumnConverter import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} -import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch private[spark] class SplitBucketRDD( @@ -37,45 +36,12 @@ private[spark] class SplitBucketRDD( newNumBuckets: Int, output: Seq[Attribute]) extends FileScanRDD(sparkSession, readFunction, filePartitions) { - - private val numRows: Int = sparkSession.sessionState.conf.columnBatchSize - private val useOffHeap: Boolean = sparkSession.sessionState.conf.offHeapColumnVectorEnabled - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val schema = StructType.fromAttributes(output) - val converters = new RowToColumnConverter(schema) - val vectors = if (useOffHeap) { - OffHeapColumnVector.allocateColumns(numRows, schema) - } else { - OnHeapColumnVector.allocateColumns(numRows, schema) - } - val columnarBatch = new ColumnarBatch(vectors.toArray) - context.addTaskCompletionListener[Unit] { _ => - columnarBatch.close() - } - val iter: Iterator[_] = super.compute(split, context) - iter.map { - case row: InternalRow => row - case batch: ColumnarBatch => - val rowIterator = batch.rowIterator() - columnarBatch.setNumRows(0) - vectors.foreach(_.reset()) - var rowCount = 0 - while (rowIterator.hasNext) { - val row = rowIterator.next() - if (getBucketId(row) == split.index) { - converters.convert(row, vectors.toArray) - rowCount += 1 - } - } - columnarBatch.setNumRows(rowCount) - columnarBatch - }.filter { - case r: InternalRow => - getBucketId(r) == split.index - case _: ColumnarBatch => true - }.asInstanceOf[Iterator[InternalRow]] + iter.flatMap { + case row: InternalRow => Seq(row) + case batch: ColumnarBatch => batch.rowIterator().asScala + }.filter(getBucketId(_) == split.index) } private lazy val getBucketId: InternalRow => Int = { From bc5fcd21b76af724f22079568739194fa811f483 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 18 Aug 2020 10:35:26 -0700 Subject: [PATCH 04/10] renaming --- .../sql/execution/DataSourceScanExec.scala | 24 +++++++++---------- .../spark/sql/execution/QueryExecution.scala | 4 ++-- ... CoalesceOrRepartitionBucketsInJoin.scala} | 14 +++++------ ...esceOrRepartitionBucketsInJoinSuite.scala} | 10 ++++---- .../spark/sql/sources/BucketedReadSuite.scala | 4 ++-- 5 files changed, 28 insertions(+), 28 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/{CoalesceBucketsInJoin.scala => CoalesceOrRepartitionBucketsInJoin.scala} (93%) rename sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/{CoalesceBucketsInJoinSuite.scala => CoalesceOrRepartitionBucketsInJoinSuite.scala} (96%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index ed5afa474e2c7..4694dd04d7195 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -157,7 +157,7 @@ case class RowDataSourceScanExec( * @param requiredSchema Required schema of the underlying relation, excluding partition columns. * @param partitionFilters Predicates to use for partition pruning. * @param optionalBucketSet Bucket ids for bucket pruning. - * @param optionalNumCoalescedBuckets Number of coalesced buckets. + * @param optionalNewNumBuckets Number of buckets to coalesce or repartition. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. */ @@ -167,7 +167,7 @@ case class FileSourceScanExec( requiredSchema: StructType, partitionFilters: Seq[Expression], optionalBucketSet: Option[BitSet], - optionalNumCoalescedBuckets: Option[Int], + optionalNewNumBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { @@ -176,8 +176,8 @@ case class FileSourceScanExec( // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { val isSplittingBucket = relation.bucketSpec.isDefined && - optionalNumCoalescedBuckets.isDefined && - optionalNumCoalescedBuckets.get > relation.bucketSpec.get.numBuckets + optionalNewNumBuckets.isDefined && + optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isSplittingBucket @@ -299,7 +299,7 @@ case class FileSourceScanExec( // above val spec = relation.bucketSpec.get val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) - val numPartitions = optionalNumCoalescedBuckets.getOrElse(spec.numBuckets) + val numPartitions = optionalNewNumBuckets.getOrElse(spec.numBuckets) val partitioning = HashPartitioning(bucketColumns, numPartitions) val sortColumns = spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) @@ -321,7 +321,7 @@ case class FileSourceScanExec( val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. - if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { + if (singleFilePartitions && optionalNewNumBuckets.isEmpty) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -367,7 +367,7 @@ case class FileSourceScanExec( } metadata + ("SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" + - optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) + optionalNewNumBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) } getOrElse { metadata } @@ -555,15 +555,15 @@ case class FileSourceScanExec( filesGroupedToBuckets } - if (optionalNumCoalescedBuckets.isEmpty) { + if (optionalNewNumBuckets.isEmpty) { val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } else { - val newNumBuckets = optionalNumCoalescedBuckets.get + val newNumBuckets = optionalNewNumBuckets.get if (newNumBuckets < bucketSpec.numBuckets) { - logInfo(s"Coalescing to $newNumBuckets buckets") + logInfo(s"Coalescing to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => val partitionedFiles = coalescedBuckets @@ -574,7 +574,7 @@ case class FileSourceScanExec( } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } else { - logInfo(s"Splitting to $newNumBuckets buckets") + logInfo(s"Repartitioning to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => FilePartition( bucketId, @@ -647,7 +647,7 @@ case class FileSourceScanExec( QueryPlan.normalizePredicates( filterUnusedDynamicPruningExpressions(partitionFilters), output), optionalBucketSet, - optionalNumCoalescedBuckets, + optionalNewNumBuckets, QueryPlan.normalizePredicates(dataFilters, output), None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index ace2a11ddaa92..3f16dfade48a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} -import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin +import org.apache.spark.sql.execution.bucketing.CoalesceOrRepartitionBucketsInJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -339,7 +339,7 @@ object QueryExecution { // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( - CoalesceBucketsInJoin(sparkSession.sessionState.conf), + CoalesceOrRepartitionBucketsInJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), RemoveRedundantProjects(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index f1fbbb87bcc6c..c8bc7c74b8c2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -38,13 +38,13 @@ import org.apache.spark.sql.internal.SQLConf * - The ratio of the number of buckets is less than the value set in * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. */ -case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { - private def updateNumCoalescedBucketsInScan( +case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def updateNumBucketsInScan( plan: SparkPlan, - numCoalescedBuckets: Int): SparkPlan = { + newNumBuckets: Int): SparkPlan = { plan transformUp { case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + f.copy(optionalNewNumBuckets = Some(newNumBuckets)) } } @@ -55,14 +55,14 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { numCoalescedBuckets: Int): BaseJoinExec = { if (numCoalescedBuckets != numLeftBuckets) { val leftCoalescedChild = - updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets) + updateNumBucketsInScan(join.left, numCoalescedBuckets) join match { case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) } } else { val rightCoalescedChild = - updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets) + updateNumBucketsInScan(join.right, numCoalescedBuckets) join match { case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) @@ -129,7 +129,7 @@ object ExtractJoinWithBuckets { private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { plan.collectFirst { case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && - f.optionalNumCoalescedBuckets.isEmpty => + f.optionalNewNumBuckets.isEmpty => f.relation.bucketSpec.get } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala index 89aee37a4246f..b91c6954f5e53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} -class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { +class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { private val SORT_MERGE_JOIN = "sortMergeJoin" private val SHUFFLED_HASH_JOIN = "shuffledHashJoin" private val BROADCAST_HASH_JOIN = "broadcastHashJoin" @@ -99,12 +99,12 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) } - val plan = CoalesceBucketsInJoin(spark.sessionState.conf)(join) + val plan = CoalesceOrRepartitionBucketsInJoin(spark.sessionState.conf)(join) def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { val coalesced = subPlan.collect { - case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.nonEmpty => - f.optionalNumCoalescedBuckets.get + case f: FileSourceScanExec if f.optionalNewNumBuckets.nonEmpty => + f.optionalNewNumBuckets.get } if (expected.isDefined) { assert(coalesced.size == 1 && coalesced.head == expected.get) @@ -265,7 +265,7 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { val value = scan.metadata("SelectedBucketsCount") assert(value === "8 out of 8") - val scanWithCoalescing = scan.copy(optionalNumCoalescedBuckets = Some(4)) + val scanWithCoalescing = scan.copy(optionalNewNumBuckets = Some(4)) val valueWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") assert(valueWithCoalescing == "8 out of 8 (Coalesced to 4)") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 3e520c0bfb059..784bdda28d393 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -960,11 +960,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { assert(shuffles.length == expectedNumShuffles) val scans = plan.collect { - case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f + case f: FileSourceScanExec if f.relation.bucketSpec.isDefined => f } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) - assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + assert(scans.head.optionalNewNumBuckets == expectedCoalescedNumBuckets) } else { assert(scans.isEmpty) } From 21882abe495ce6c1fff297ec7d52928f13a2fcc9 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 18 Aug 2020 17:40:27 -0700 Subject: [PATCH 05/10] tests and cleanup --- .../apache/spark/sql/internal/SQLConf.scala | 29 ++-- .../sql/execution/DataSourceScanExec.scala | 28 ++-- ...DD.scala => BucketRepartitioningRDD.scala} | 12 +- .../CoalesceOrRepartitionBucketsInJoin.scala | 52 ++++--- ...lesceOrRepartitionBucketsInJoinSuite.scala | 2 +- .../spark/sql/sources/BucketedReadSuite.scala | 128 +++++++++++------- 6 files changed, 154 insertions(+), 97 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/{SplitBucketRDD.scala => BucketRepartitioningRDD.scala} (85%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 03d1f78307592..87e2d5ee3b0cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2665,24 +2665,24 @@ object SQLConf { .booleanConf .createWithDefault(false) - val SPLIT_BUCKETS_IN_JOIN_ENABLED = - buildConf("spark.sql.bucketing.splitBucketsInJoin.enabled") + val REPARTITION_BUCKETS_IN_JOIN_ENABLED = + buildConf("spark.sql.bucketing.repartitionBucketsInJoin.enabled") .doc("When true, if two bucketed tables with the different number of buckets are joined, " + - "the side with a bigger number of buckets will be coalesced to have the same number " + + "the side with a smaller number of buckets will be repartitioned to have the same number " + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + - "number of buckets. Bucket coalescing is applied to sort-merge joins and " + - "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + - "in join, but it also reduces parallelism and could possibly cause OOM for " + - "shuffled hash join.") + "number of buckets. Bucket repartitioning is applied to sort-merge joins and " + + "shuffled hash join. Note: Repartitioning bucketed table can avoid unnecessary shuffling " + + "in join and increases parallelism, but it also reads more data than necessary.") .version("3.1.0") .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") - .doc("The ratio of the number of two buckets being coalesced should be less than or " + - "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") + val COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceOrRepartitionBucketsInJoin.maxBucketRatio") + .doc("The ratio of the number of two buckets being coalesced/repartitioned should be " + + "less than or equal to this value for bucket coalescing/repartitioning to be applied. " + + s"This configuration only has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' " + + s"or '${REPARTITION_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3324,12 +3324,13 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) - def splitBucketsInJoinEnabled: Boolean = getConf(SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED) + def repartitionBucketsInJoinEnabled: Boolean = + getConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED) def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) def coalesceBucketsInJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + getConf(SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) def optimizeNullAwareAntiJoin: Boolean = getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 4694dd04d7195..f7c492679f9d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.bucketing.SplitBucketRDD +import org.apache.spark.sql.execution.bucketing.BucketRepartitioningRDD import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -175,12 +175,16 @@ case class FileSourceScanExec( // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { - val isSplittingBucket = relation.bucketSpec.isDefined && - optionalNewNumBuckets.isDefined && - optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets + // `RepartitioningBucketRDD` converts columnar batches to rows to calculate bucket id for each + // row, thus columnar is not supported when `RepartitioningBucketRDD` is used to avoid + // conversions from batches to rows and back to batches. + relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets + } - relation.fileFormat.supportBatch(relation.sparkSession, schema) && - !isSplittingBucket + @transient private lazy val isRepartitioningBuckets: Boolean = { + relation.bucketSpec.isDefined && + optionalNewNumBuckets.isDefined && + optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets } private lazy val needsUnsafeRowConversion: Boolean = { @@ -321,7 +325,7 @@ case class FileSourceScanExec( val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) // TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. - if (singleFilePartitions && optionalNewNumBuckets.isEmpty) { + if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || isRepartitioningBuckets)) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -367,7 +371,9 @@ case class FileSourceScanExec( } metadata + ("SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" + - optionalNewNumBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) + optionalNewNumBuckets.map { b => + if (b > spec.numBuckets) s" (Repartitioned to $b)" else s" (Coalesced to $b)" + }.getOrElse(""))) } getOrElse { metadata } @@ -563,6 +569,7 @@ case class FileSourceScanExec( } else { val newNumBuckets = optionalNewNumBuckets.get if (newNumBuckets < bucketSpec.numBuckets) { + assert(bucketSpec.numBuckets % newNumBuckets == 0) logInfo(s"Coalescing to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => @@ -574,18 +581,19 @@ case class FileSourceScanExec( } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) } else { + assert(newNumBuckets % bucketSpec.numBuckets == 0) logInfo(s"Repartitioning to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => FilePartition( bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId % bucketSpec.numBuckets, Array.empty)) } - // There will be more files read. + // There are now more files to be read. val filesNum = filePartitions.map(_.files.size.toLong).sum val filesSize = filePartitions.map(_.files.map(_.length).sum).sum driverMetrics("numFiles") = filesNum driverMetrics("filesSize") = filesSize - new SplitBucketRDD( + new BucketRepartitioningRDD( fsRelation.sparkSession, readFile, filePartitions, bucketSpec, newNumBuckets, output) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala similarity index 85% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala index 50ba197e9923a..ad45f3d7d93b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/SplitBucketRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala @@ -28,14 +28,20 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.vectorized.ColumnarBatch -private[spark] class SplitBucketRDD( +/** + * An RDD that filters out the rows that do not belong to the current bucket file being read. + */ +private[spark] class BucketRepartitioningRDD( @transient private val sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], @transient override val filePartitions: Seq[FilePartition], bucketSpec: BucketSpec, - newNumBuckets: Int, + numRepartitionedBuckets: Int, output: Seq[Attribute]) extends FileScanRDD(sparkSession, readFunction, filePartitions) { + assert(numRepartitionedBuckets > bucketSpec.numBuckets) + assert(numRepartitionedBuckets % bucketSpec.numBuckets == 0) + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iter: Iterator[_] = super.compute(split, context) iter.flatMap { @@ -47,7 +53,7 @@ private[spark] class SplitBucketRDD( private lazy val getBucketId: InternalRow => Int = { val bucketIdExpression = { val bucketColumns = bucketSpec.bucketColumnNames.map(c => output.find(_.name == c).get) - HashPartitioning(bucketColumns, newNumBuckets).partitionIdExpression + HashPartitioning(bucketColumns, numRepartitionedBuckets).partitionIdExpression } val projection = UnsafeProjection.create(Seq(bucketIdExpression), output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index c8bc7c74b8c2f..445510bf5d307 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.bucketing import scala.annotation.tailrec +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} @@ -48,24 +49,22 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP } } - private def updateNumCoalescedBuckets( + private def updateNumBuckets( join: BaseJoinExec, numLeftBuckets: Int, numRightBucket: Int, - numCoalescedBuckets: Int): BaseJoinExec = { - if (numCoalescedBuckets != numLeftBuckets) { - val leftCoalescedChild = - updateNumBucketsInScan(join.left, numCoalescedBuckets) + newNumBuckets: Int): BaseJoinExec = { + if (newNumBuckets != numLeftBuckets) { + val leftChild = updateNumBucketsInScan(join.left, newNumBuckets) join match { - case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) - case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) + case j: SortMergeJoinExec => j.copy(left = leftChild) + case j: ShuffledHashJoinExec => j.copy(left = leftChild) } } else { - val rightCoalescedChild = - updateNumBucketsInScan(join.right, numCoalescedBuckets) + val rightChild = updateNumBucketsInScan(join.right, newNumBuckets) join match { - case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) - case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) + case j: SortMergeJoinExec => j.copy(right = rightChild) + case j: ShuffledHashJoinExec => j.copy(right = rightChild) } } } @@ -83,10 +82,16 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP } def apply(plan: SparkPlan): SparkPlan = { - if (!conf.coalesceBucketsInJoinEnabled && !conf.splitBucketsInJoinEnabled) { + if (!conf.coalesceBucketsInJoinEnabled && !conf.repartitionBucketsInJoinEnabled) { return plan } + if (conf.coalesceBucketsInJoinEnabled && conf.repartitionBucketsInJoinEnabled) { + throw new AnalysisException("Both 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' and " + + "'spark.sql.bucketing.repartitionBucketsInJoin.enabled' cannot be set to true at the" + + "same time") + } + plan transform { case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= @@ -98,12 +103,18 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP } join match { case j: SortMergeJoinExec => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) - case j: ShuffledHashJoinExec - // Only coalesce the buckets for shuffled hash join stream side, - // to avoid OOM for build side. - if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets) => - updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case j: ShuffledHashJoinExec => + if (conf.repartitionBucketsInJoinEnabled) { + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + } else if (conf.coalesceBucketsInJoinEnabled && + isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets)) { + // Only coalesce the buckets for shuffled hash join stream side, + // to avoid OOM for build side. + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + } else { + j + } case other => other } case other => other @@ -159,8 +170,9 @@ object ExtractJoinWithBuckets { private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = { val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) - // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller - // number of buckets because bucket id is calculated by modding the total number of buckets. + // A bucket can be coalesced or repartitioned only if the bigger number of buckets is divisible + // by the smaller number of buckets because bucket id is calculated by modding the total number + // of buckets. numBuckets1 != numBuckets2 && large % small == 0 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala index b91c6954f5e53..c6c7b60e80c43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -175,7 +175,7 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp test("the ratio of the number of buckets is greater than max allowed") { withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { run(JoinSetting( RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 784bdda28d393..7587f984b1fa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -901,7 +901,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("bucket coalescing is not satisfied") { + test("bucket coalescing/repartitioning is not satisfied") { def run(testSpec1: BucketedTableTestSpec, testSpec2: BucketedTableTestSpec): Unit = { Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => testBucketing( @@ -911,35 +911,50 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { - // Coalescing buckets is disabled by a config. - run( - BucketedTableTestSpec( - Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "false").foreach { enabledConfig => + withSQLConf(enabledConfig) { + // Coalescing/repartitioning buckets is disabled by a config. + run( + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) + } } - withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { - // Coalescing buckets is not applied because the ratio of the number of buckets (3) - // is greater than max allowed (2). - run( - BucketedTableTestSpec( - Some(BucketSpec(12, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enabledConfig => + withSQLConf( + enabledConfig, + SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + // Coalescing/repartitioning buckets is not applied because the ratio of + // the number of buckets (3) is greater than max allowed (2). + run( + BucketedTableTestSpec( + Some(BucketSpec(12, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) + } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run( - // Coalescing buckets is not applied because the bigger number of buckets (8) is not - // divisible by the smaller number of buckets (7). - BucketedTableTestSpec( - Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(7, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enabledConfig => + withSQLConf(enabledConfig) { + run( + // Coalescing/repartitioning buckets is not applied because the bigger number of + // buckets (8) is not divisible by the smaller number of buckets (7). + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(7, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) + } } } @@ -960,7 +975,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { assert(shuffles.length == expectedNumShuffles) val scans = plan.collect { - case f: FileSourceScanExec if f.relation.bucketSpec.isDefined => f + case f: FileSourceScanExec if f.optionalNewNumBuckets.isDefined => f } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) @@ -983,35 +998,50 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("split shuffle") { + test("bucket repartitioning eliminates shuffle") { + withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + // The side with testSpec2 will be repartitioned to have 8 output partitions. + val testSpec1 = BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false, + expectedNumOutputPartitions = Some(8)) + val testSpec2 = BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false, + expectedNumOutputPartitions = Some(8)) + + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } + } + } + + test("bucket repartitioning should work with wholestage codegen enabled") { withTable("t1", "t2") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i").saveAsTable("t2") - // scalastyle:off println -// withSQLConf( -// SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "false", -// SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", -// SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { -// val t1 = spark.table("t1") -// val t2 = spark.table("t2") -// val joined = t1.join(t2, t1("i") === t2("i"), "right") -// joined.explain -// println(joined.count) -// val expected = joined.collect() -// } - - withSQLConf(SQLConf.SPLIT_BUCKETS_IN_JOIN_ENABLED.key -> "true", + + val t1 = spark.table("t1") + val t2 = spark.table("t2") + val expected = t1.join(t2, t1("i") === t2("i")).collect + + withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { val t1 = spark.table("t1") val t2 = spark.table("t2") - val joined2 = t1.join(t2, t1("i") === t2("i")) - joined2.explain - joined2.show - println(joined2.count) - // checkAnswer(joined2, expected) + val actual = t1.join(t2, t1("i") === t2("i")) + val plan = actual.queryExecution.executedPlan + assert(plan.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) + checkAnswer(actual, expected) } - // scalastyle:on println } } } From 5665bc1107d6f9f06d1663a6f1ba8fa2ef5491e5 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 28 Aug 2020 14:36:33 -0700 Subject: [PATCH 06/10] add test --- ...lesceOrRepartitionBucketsInJoinSuite.scala | 242 +++++++++++------- 1 file changed, 144 insertions(+), 98 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala index c6c7b60e80c43..7f9755b399707 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -37,12 +37,12 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp case class RelationSetting( cols: Seq[Attribute], numBuckets: Int, - expectedCoalescedNumBuckets: Option[Int]) + expectedNumBuckets: Option[Int]) object RelationSetting { - def apply(numBuckets: Int, expectedCoalescedNumBuckets: Option[Int]): RelationSetting = { + def apply(numBuckets: Int, expectedNumBuckets: Option[Int]): RelationSetting = { val cols = Seq(AttributeReference("i", IntegerType)()) - RelationSetting(cols, numBuckets, expectedCoalescedNumBuckets) + RelationSetting(cols, numBuckets, expectedNumBuckets) } } @@ -80,13 +80,14 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp leftKeys = setting.rightKeys, rightKeys = setting.leftKeys, leftRelation = setting.rightRelation, - rightRelation = setting.leftRelation) + rightRelation = setting.leftRelation, + shjBuildSide = setting.shjBuildSide.map { + case BuildLeft => BuildRight + case BuildRight => BuildLeft + }) + + val settings = Seq(setting, swappedSetting) - val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN) { - Seq(setting, swappedSetting) - } else { - Seq(setting) - } settings.foreach { s => val lScan = newFileSourceScanExec(s.leftRelation) val rScan = newFileSourceScanExec(s.rightRelation) @@ -102,19 +103,19 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp val plan = CoalesceOrRepartitionBucketsInJoin(spark.sessionState.conf)(join) def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { - val coalesced = subPlan.collect { + val optionalNewNumBuckets = subPlan.collect { case f: FileSourceScanExec if f.optionalNewNumBuckets.nonEmpty => f.optionalNewNumBuckets.get } if (expected.isDefined) { - assert(coalesced.size == 1 && coalesced.head == expected.get) + assert(optionalNewNumBuckets.size == 1 && optionalNewNumBuckets.head == expected.get) } else { - assert(coalesced.isEmpty) + assert(optionalNewNumBuckets.isEmpty) } } - verify(s.leftRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].left) - verify(s.rightRelation.expectedCoalescedNumBuckets, plan.asInstanceOf[BinaryExecNode].right) + verify(s.leftRelation.expectedNumBuckets, plan.asInstanceOf[BinaryExecNode].left) + verify(s.rightRelation.expectedNumBuckets, plan.asInstanceOf[BinaryExecNode].right) } } @@ -136,9 +137,32 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - test("bucket coalescing should work only for sort merge join and shuffled hash join") { - Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) { + test("bucket repartitioning - basic") { + withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(buildSide))) + } + } + + withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "false") { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, None), joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(JoinSetting( + RelationSetting(8, None), RelationSetting(4, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(buildSide))) + } + } + } + + test("bucket coalesce/repartition should work only for sort merge join and shuffled hash join") { + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => + withSQLConf(enableConfig) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) } @@ -153,120 +177,142 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - test("bucket coalescing shouldn't be applied when the number of buckets are the same") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + test("bucket coalesce/repartition shouldn't be applied when the number of buckets are the same") { + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => + withSQLConf(enableConfig) { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } } } test("number of bucket is not divisible by other number of bucket") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => + withSQLConf(enableConfig) { + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } } } test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) - run(JoinSetting( - RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => + withSQLConf(enableConfig, + SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } } } test("join keys should match with output partitioning") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - val lCols = Seq( - AttributeReference("l1", IntegerType)(), - AttributeReference("l2", IntegerType)()) - val rCols = Seq( - AttributeReference("r1", IntegerType)(), - AttributeReference("r2", IntegerType)()) - - val lRel = RelationSetting(lCols, 4, None) - val rRel = RelationSetting(rCols, 8, None) - - // The following should not be coalesced because join keys do not match with output - // partitioning (missing one expression). - run(JoinSetting( - leftKeys = Seq(lCols.head), - rightKeys = Seq(rCols.head), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) + val lCols = Seq( + AttributeReference("l1", IntegerType)(), + AttributeReference("l2", IntegerType)()) + val rCols = Seq( + AttributeReference("r1", IntegerType)(), + AttributeReference("r2", IntegerType)()) - run(JoinSetting( - leftKeys = Seq(lCols.head), - rightKeys = Seq(rCols.head), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + val lRel = RelationSetting(lCols, 4, None) + val rRel = RelationSetting(rCols, 8, None) - // The following should not be coalesced because join keys do not match with output - // partitioning (more expressions). - run(JoinSetting( - leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), - rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) + Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => + withSQLConf(enableConfig) { + // The following should not be coalesced because join keys do not match with output + // partitioning (missing one expression). + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) - run(JoinSetting( - leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), - rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), - leftRelation = lRel, - rightRelation = rRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + + // The following should not be coalesced because join keys do not match with output + // partitioning (more expressions). + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + } + } + + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { // The following will be coalesced since ordering should not matter because it will be // adjusted in `EnsureRequirements`. - run(JoinSetting( + val setting = JoinSetting( leftKeys = lCols.reverse, rightKeys = rCols.reverse, leftRelation = lRel, rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = SORT_MERGE_JOIN, - shjBuildSide = None)) + joinOperator = "", + shjBuildSide = None) - run(JoinSetting( + run(setting.copy(joinOperator = SORT_MERGE_JOIN)) + run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) + } + + withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + // The following will be repartitioned since ordering should not matter because it will be + // adjusted in `EnsureRequirements`. + val setting = JoinSetting( leftKeys = lCols.reverse, rightKeys = rCols.reverse, - leftRelation = lRel, - rightRelation = RelationSetting(rCols, 8, Some(4)), - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildLeft))) + leftRelation = RelationSetting(lCols, 4, Some(8)), + rightRelation = rRel, + joinOperator = "", + shjBuildSide = None) - run(JoinSetting( - leftKeys = rCols.reverse, - rightKeys = lCols.reverse, - leftRelation = RelationSetting(rCols, 8, Some(4)), - rightRelation = lRel, - joinOperator = SHUFFLED_HASH_JOIN, - shjBuildSide = Some(BuildRight))) + run(setting.copy(joinOperator = SORT_MERGE_JOIN)) + Seq(BuildLeft, BuildRight).foreach { buildSide => + run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(buildSide))) + } } } - test("FileSourceScanExec's metadata should be updated with coalesced info") { + test("FileSourceScanExec's metadata should be updated with coalesced/repartitioned info") { val scan = newFileSourceScanExec(RelationSetting(8, None)) val value = scan.metadata("SelectedBucketsCount") assert(value === "8 out of 8") val scanWithCoalescing = scan.copy(optionalNewNumBuckets = Some(4)) - val valueWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") - assert(valueWithCoalescing == "8 out of 8 (Coalesced to 4)") + val metadataWithCoalescing = scanWithCoalescing.metadata("SelectedBucketsCount") + assert(metadataWithCoalescing == "8 out of 8 (Coalesced to 4)") + + val scanWithRepartitioning = scan.copy(optionalNewNumBuckets = Some(16)) + val metadataWithRepartitioning = scanWithRepartitioning.metadata("SelectedBucketsCount") + assert(metadataWithRepartitioning == "8 out of 8 (Repartitioned to 16)") } } From e2374ac281bbcb23c0dc49786ce7d8148f9761bd Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 28 Aug 2020 18:14:20 -0700 Subject: [PATCH 07/10] Change config to use enum --- .../apache/spark/sql/internal/SQLConf.scala | 55 +++++++++---------- .../CoalesceOrRepartitionBucketsInJoin.scala | 43 +++++++-------- .../org/apache/spark/sql/ExplainSuite.scala | 31 ++++++----- ...lesceOrRepartitionBucketsInJoinSuite.scala | 51 +++++++++-------- .../spark/sql/sources/BucketedReadSuite.scala | 47 ++++++++-------- 5 files changed, 118 insertions(+), 109 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 09b2071f12547..3c866dc518ffa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2655,37 +2655,34 @@ object SQLConf { .booleanConf .createWithDefault(true) - val COALESCE_BUCKETS_IN_JOIN_ENABLED = - buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled") - .doc("When true, if two bucketed tables with the different number of buckets are joined, " + - "the side with a bigger number of buckets will be coalesced to have the same number " + - "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + - "number of buckets. Bucket coalescing is applied to sort-merge joins and " + - "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + - "in join, but it also reduces parallelism and could possibly cause OOM for " + - "shuffled hash join.") - .version("3.1.0") - .booleanConf - .createWithDefault(false) + object BucketReadStrategyForJoin extends Enumeration { + val COALESCE, REPARTITION, OFF = Value + } - val REPARTITION_BUCKETS_IN_JOIN_ENABLED = - buildConf("spark.sql.bucketing.repartitionBucketsInJoin.enabled") - .doc("When true, if two bucketed tables with the different number of buckets are joined, " + - "the side with a smaller number of buckets will be repartitioned to have the same number " + - "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + - "number of buckets. Bucket repartitioning is applied to sort-merge joins and " + - "shuffled hash join. Note: Repartitioning bucketed table can avoid unnecessary shuffling " + - "in join and increases parallelism, but it also reads more data than necessary.") + val BUCKET_READ_STRATEGY_FOR_JOIN = + buildConf("spark.sql.bucketing.bucketReadStrategyForJoin") + .doc("When set to COALESCE, if two bucketed tables with the different number of buckets " + + "are joined, the side with a bigger number of buckets will be coalesced to have the same " + + "number of buckets as the other side. When set to REPARTITION, the side with a bigger " + + "number of buckets will be repartitioned to have the same number of buckets as the other " + + "side. The bigger number of buckets must be divisible by the smaller number of buckets, " + + "and the strategy is applied to sort-merge joins and shuffled hash joins. " + + "Note: Coalescing bucketed table can avoid unnecessary shuffle in join, but it also " + + "reduces parallelism and could possibly cause OOM for shuffled hash join. Repartitioning " + + "bucketed table avoids unnecessary shuffle in join while maintaining the parallelism " + + "at the cost of reading duplicate data.") .version("3.1.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(BucketReadStrategyForJoin.values.map(_.toString)) + .createWithDefault(BucketReadStrategyForJoin.OFF.toString) val COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = buildConf("spark.sql.bucketing.coalesceOrRepartitionBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced/repartitioned should be " + "less than or equal to this value for bucket coalescing/repartitioning to be applied. " + - s"This configuration only has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' " + - s"or '${REPARTITION_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") + s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_FOR_JOIN.key}' " + + s"is set to a strategy other than '${BucketReadStrategyForJoin.OFF}'.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3338,12 +3335,7 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) - def repartitionBucketsInJoinEnabled: Boolean = - getConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED) - - def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) - - def coalesceBucketsInJoinMaxBucketRatio: Int = + def coalesceOrRepartitionBucketsInJoinMaxBucketRatio: Int = getConf(SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) def optimizeNullAwareAntiJoin: Boolean = @@ -3351,6 +3343,9 @@ class SQLConf extends Serializable with Logging { def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) + def bucketReadStrategyForJoin: BucketReadStrategyForJoin.Value = + BucketReadStrategyForJoin.withName(getConf(BUCKET_READ_STRATEGY_FOR_JOIN)) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index 445510bf5d307..ba73cfd378474 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.bucketing import scala.annotation.tailrec -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} @@ -28,16 +27,21 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyForJoin /** - * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin` + * This rule coalesces or repartitions one side of the `SortMergeJoin` and `ShuffledHashJoin` * if the following conditions are met: * - Two bucketed tables are joined. * - Join keys match with output partition expressions on their respective sides. * - The larger bucket number is divisible by the smaller bucket number. - * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + * COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + * + * The bucketed table with a larger number of buckets is coalesced if BUCKET_READ_STRATEGY_FOR_JOIN + * is set to BucketReadStrategyForJoin.COALESCE, whereas the bucketed table with a smaller number + * of buckets is repartitioned if BUCKET_READ_STRATEGY_FOR_JOIN is set to + * BucketReadStrategyForJoin.REPARTITION. */ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { private def updateNumBucketsInScan( @@ -82,21 +86,16 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP } def apply(plan: SparkPlan): SparkPlan = { - if (!conf.coalesceBucketsInJoinEnabled && !conf.repartitionBucketsInJoinEnabled) { + val bucketReadStrategy = conf.bucketReadStrategyForJoin + if (bucketReadStrategy == BucketReadStrategyForJoin.OFF) { return plan } - if (conf.coalesceBucketsInJoinEnabled && conf.repartitionBucketsInJoinEnabled) { - throw new AnalysisException("Both 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' and " + - "'spark.sql.bucketing.repartitionBucketsInJoin.enabled' cannot be set to true at the" + - "same time") - } - plan transform { case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= - conf.coalesceBucketsInJoinMaxBucketRatio => - val newNumBuckets = if (conf.coalesceBucketsInJoinEnabled) { + conf.coalesceOrRepartitionBucketsInJoinMaxBucketRatio => + val newNumBuckets = if (bucketReadStrategy == BucketReadStrategyForJoin.COALESCE) { math.min(numLeftBuckets, numRightBuckets) } else { math.max(numLeftBuckets, numRightBuckets) @@ -105,15 +104,15 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP case j: SortMergeJoinExec => updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) case j: ShuffledHashJoinExec => - if (conf.repartitionBucketsInJoinEnabled) { - updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) - } else if (conf.coalesceBucketsInJoinEnabled && - isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets)) { - // Only coalesce the buckets for shuffled hash join stream side, - // to avoid OOM for build side. - updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) - } else { - j + bucketReadStrategy match { + case BucketReadStrategyForJoin.REPARTITION => + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case BucketReadStrategyForJoin.COALESCE + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets) => + // Only coalesce the buckets for shuffled hash join stream side, + // to avoid OOM for build side. + updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) + case _ => j } case other => other } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index e43a4147ceb63..6f330c9489fb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -344,19 +344,24 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } - test("Coalesced bucket info should be a part of explain string") { - withTable("t1", "t2") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { - Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") - Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") - val df1 = spark.table("t1") - val df2 = spark.table("t2") - val joined = df1.join(df2, df1("i") === df2("i")) - checkKeywordsExistsInExplain( - joined, - SimpleMode, - "SelectedBucketsCount: 8 out of 8 (Coalesced to 4)" :: Nil: _*) + test("Coalesced/repartitioned bucket info should be a part of explain string") { + Seq((SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + "8 out of 8 (Coalesced to 4)"), + (SQLConf.BucketReadStrategyForJoin.REPARTITION.toString, + "4 out of 4 (Repartitioned to 8)")).foreach { case (strategy, expected) => + withTable("t1", "t2") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") + Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") + val df1 = spark.table("t1") + val df2 = spark.table("t2") + val joined = df1.join(df2, df1("i") === df2("i")) + checkKeywordsExistsInExplain( + joined, + SimpleMode, + s"SelectedBucketsCount: $expected" :: Nil: _*) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala index 7f9755b399707..3d4e5966eaf83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -120,7 +120,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalescing - basic") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -128,7 +129,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp shjBuildSide = Some(BuildLeft))) } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.OFF.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -138,7 +140,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket repartitioning - basic") { - withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { run(JoinSetting( RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SORT_MERGE_JOIN)) Seq(BuildLeft, BuildRight).foreach { buildSide => @@ -148,7 +151,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "false") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.OFF.toString) { run(JoinSetting( RelationSetting(8, None), RelationSetting(4, None), joinOperator = SORT_MERGE_JOIN)) Seq(BuildLeft, BuildRight).foreach { buildSide => @@ -160,9 +164,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalesce/repartition should work only for sort merge join and shuffled hash join") { - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => - withSQLConf(enableConfig) { + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) } @@ -170,7 +174,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalescing shouldn't be applied to shuffled hash join build side") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildRight))) @@ -178,9 +183,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalesce/repartition shouldn't be applied when the number of buckets are the same") { - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => - withSQLConf(enableConfig) { + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -191,9 +196,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("number of bucket is not divisible by other number of bucket") { - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => - withSQLConf(enableConfig) { + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -204,9 +209,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("the ratio of the number of buckets is greater than max allowed") { - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => - withSQLConf(enableConfig, + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy, SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { run(JoinSetting( RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) @@ -228,9 +233,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp val lRel = RelationSetting(lCols, 4, None) val rRel = RelationSetting(rCols, 8, None) - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enableConfig => - withSQLConf(enableConfig) { + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { // The following should not be coalesced because join keys do not match with output // partitioning (missing one expression). run(JoinSetting( @@ -269,7 +274,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { // The following will be coalesced since ordering should not matter because it will be // adjusted in `EnsureRequirements`. val setting = JoinSetting( @@ -284,7 +290,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) } - withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { // The following will be repartitioned since ordering should not matter because it will be // adjusted in `EnsureRequirements`. val setting = JoinSetting( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7587f984b1fa5..29a0b5e277274 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,7 +876,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. val testSpec1 = BucketedTableTestSpec( @@ -911,24 +912,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "false").foreach { enabledConfig => - withSQLConf(enabledConfig) { - // Coalescing/repartitioning buckets is disabled by a config. - run( - BucketedTableTestSpec( - Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), - expectedShuffle = false), - BucketedTableTestSpec( - Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), - expectedShuffle = true)) - } + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.OFF.toString) { + // Bucket read strategy is off. + run( + BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = false), + BucketedTableTestSpec( + Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), + expectedShuffle = true)) } - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enabledConfig => + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => withSQLConf( - enabledConfig, + SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy, SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing/repartitioning buckets is not applied because the ratio of // the number of buckets (3) is greater than max allowed (2). @@ -942,9 +941,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - Seq(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", - SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true").foreach { enabledConfig => - withSQLConf(enabledConfig) { + Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { run( // Coalescing/repartitioning buckets is not applied because the bigger number of // buckets (8) is not divisible by the smaller number of buckets (7). @@ -965,7 +964,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { def verify( query: String, expectedNumShuffles: Int, @@ -999,7 +999,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket repartitioning eliminates shuffle") { - withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { // The side with testSpec2 will be repartitioned to have 8 output partitions. val testSpec1 = BucketedTableTestSpec( Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), @@ -1032,7 +1033,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { val t2 = spark.table("t2") val expected = t1.join(t2, t1("i") === t2("i")).collect - withSQLConf(SQLConf.REPARTITION_BUCKETS_IN_JOIN_ENABLED.key -> "true", + withSQLConf( + SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> + SQLConf.BucketReadStrategyForJoin.REPARTITION.toString, SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { val t1 = spark.table("t1") From 7481e36d8781e869a0dc558e0af5d358a56ab150 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 28 Aug 2020 20:00:51 -0700 Subject: [PATCH 08/10] Modify config name / update tests --- .../apache/spark/sql/internal/SQLConf.scala | 26 ++--- .../CoalesceOrRepartitionBucketsInJoin.scala | 22 ++--- .../org/apache/spark/sql/ExplainSuite.scala | 6 +- ...lesceOrRepartitionBucketsInJoinSuite.scala | 60 ++++++------ .../spark/sql/sources/BucketedReadSuite.scala | 94 ++++++++++--------- 5 files changed, 105 insertions(+), 103 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3c866dc518ffa..1b24d0ed54fcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2655,12 +2655,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - object BucketReadStrategyForJoin extends Enumeration { + object BucketReadStrategyInJoin extends Enumeration { val COALESCE, REPARTITION, OFF = Value } - val BUCKET_READ_STRATEGY_FOR_JOIN = - buildConf("spark.sql.bucketing.bucketReadStrategyForJoin") + val BUCKET_READ_STRATEGY_IN_JOIN = + buildConf("spark.sql.bucketing.bucketReadStrategyInJoin") .doc("When set to COALESCE, if two bucketed tables with the different number of buckets " + "are joined, the side with a bigger number of buckets will be coalesced to have the same " + "number of buckets as the other side. When set to REPARTITION, the side with a bigger " + @@ -2674,15 +2674,15 @@ object SQLConf { .version("3.1.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(BucketReadStrategyForJoin.values.map(_.toString)) - .createWithDefault(BucketReadStrategyForJoin.OFF.toString) + .checkValues(BucketReadStrategyInJoin.values.map(_.toString)) + .createWithDefault(BucketReadStrategyInJoin.OFF.toString) - val COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceOrRepartitionBucketsInJoin.maxBucketRatio") + val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced/repartitioned should be " + "less than or equal to this value for bucket coalescing/repartitioning to be applied. " + - s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_FOR_JOIN.key}' " + - s"is set to a strategy other than '${BucketReadStrategyForJoin.OFF}'.") + s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_IN_JOIN.key}' " + + s"is set to a strategy other than '${BucketReadStrategyInJoin.OFF}'.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3335,16 +3335,16 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) - def coalesceOrRepartitionBucketsInJoinMaxBucketRatio: Int = - getConf(SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + def bucketReadStrategyInJoinMaxBucketRatio: Int = + getConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO) def optimizeNullAwareAntiJoin: Boolean = getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN) def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) - def bucketReadStrategyForJoin: BucketReadStrategyForJoin.Value = - BucketReadStrategyForJoin.withName(getConf(BUCKET_READ_STRATEGY_FOR_JOIN)) + def bucketReadStrategyInJoin: BucketReadStrategyInJoin.Value = + BucketReadStrategyInJoin.withName(getConf(BUCKET_READ_STRATEGY_IN_JOIN)) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index ba73cfd378474..23423aacf56d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyForJoin +import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyInJoin /** * This rule coalesces or repartitions one side of the `SortMergeJoin` and `ShuffledHashJoin` @@ -38,10 +38,10 @@ import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyForJoin * - The ratio of the number of buckets is less than the value set in * COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. * - * The bucketed table with a larger number of buckets is coalesced if BUCKET_READ_STRATEGY_FOR_JOIN - * is set to BucketReadStrategyForJoin.COALESCE, whereas the bucketed table with a smaller number - * of buckets is repartitioned if BUCKET_READ_STRATEGY_FOR_JOIN is set to - * BucketReadStrategyForJoin.REPARTITION. + * The bucketed table with a larger number of buckets is coalesced if BUCKET_READ_STRATEGY_IN_JOIN + * is set to BucketReadStrategyInJoin.COALESCE, whereas the bucketed table with a smaller number of + * buckets is repartitioned if BUCKET_READ_STRATEGY_IN_JOIN is set to + * BucketReadStrategyInJoin.REPARTITION. */ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { private def updateNumBucketsInScan( @@ -86,16 +86,16 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP } def apply(plan: SparkPlan): SparkPlan = { - val bucketReadStrategy = conf.bucketReadStrategyForJoin - if (bucketReadStrategy == BucketReadStrategyForJoin.OFF) { + val bucketReadStrategy = conf.bucketReadStrategyInJoin + if (bucketReadStrategy == BucketReadStrategyInJoin.OFF) { return plan } plan transform { case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= - conf.coalesceOrRepartitionBucketsInJoinMaxBucketRatio => - val newNumBuckets = if (bucketReadStrategy == BucketReadStrategyForJoin.COALESCE) { + conf.bucketReadStrategyInJoinMaxBucketRatio => + val newNumBuckets = if (bucketReadStrategy == BucketReadStrategyInJoin.COALESCE) { math.min(numLeftBuckets, numRightBuckets) } else { math.max(numLeftBuckets, numRightBuckets) @@ -105,9 +105,9 @@ case class CoalesceOrRepartitionBucketsInJoin(conf: SQLConf) extends Rule[SparkP updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) case j: ShuffledHashJoinExec => bucketReadStrategy match { - case BucketReadStrategyForJoin.REPARTITION => + case BucketReadStrategyInJoin.REPARTITION => updateNumBuckets(j, numLeftBuckets, numRightBuckets, newNumBuckets) - case BucketReadStrategyForJoin.COALESCE + case BucketReadStrategyInJoin.COALESCE if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, newNumBuckets) => // Only coalesce the buckets for shuffled hash join stream side, // to avoid OOM for build side. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 6f330c9489fb7..4621406e862a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -345,13 +345,13 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } test("Coalesced/repartitioned bucket info should be a part of explain string") { - Seq((SQLConf.BucketReadStrategyForJoin.COALESCE.toString, + Seq((SQLConf.BucketReadStrategyInJoin.COALESCE.toString, "8 out of 8 (Coalesced to 4)"), - (SQLConf.BucketReadStrategyForJoin.REPARTITION.toString, + (SQLConf.BucketReadStrategyInJoin.REPARTITION.toString, "4 out of 4 (Repartitioned to 8)")).foreach { case (strategy, expected) => withTable("t1", "t2") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") val df1 = spark.table("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala index 3d4e5966eaf83..d1fca669c7c1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoinSuite.scala @@ -120,8 +120,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalescing - basic") { - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -129,8 +129,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp shjBuildSide = Some(BuildLeft))) } - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.OFF.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -140,8 +140,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket repartitioning - basic") { - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { run(JoinSetting( RelationSetting(8, None), RelationSetting(4, Some(8)), joinOperator = SORT_MERGE_JOIN)) Seq(BuildLeft, BuildRight).foreach { buildSide => @@ -151,8 +151,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.OFF.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { run(JoinSetting( RelationSetting(8, None), RelationSetting(4, None), joinOperator = SORT_MERGE_JOIN)) Seq(BuildLeft, BuildRight).foreach { buildSide => @@ -164,9 +164,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalesce/repartition should work only for sort merge join and shuffled hash join") { - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) } @@ -174,8 +174,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalescing shouldn't be applied to shuffled hash join build side") { - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { run(JoinSetting( RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildRight))) @@ -183,9 +183,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("bucket coalesce/repartition shouldn't be applied when the number of buckets are the same") { - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -196,9 +196,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("number of bucket is not divisible by other number of bucket") { - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { run(JoinSetting( RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -209,10 +209,10 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } test("the ratio of the number of buckets is greater than max allowed") { - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy, - SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy, + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { run(JoinSetting( RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( @@ -233,9 +233,9 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp val lRel = RelationSetting(lCols, 4, None) val rRel = RelationSetting(rCols, 8, None) - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { // The following should not be coalesced because join keys do not match with output // partitioning (missing one expression). run(JoinSetting( @@ -274,8 +274,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp } } - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { // The following will be coalesced since ordering should not matter because it will be // adjusted in `EnsureRequirements`. val setting = JoinSetting( @@ -290,8 +290,8 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN, shjBuildSide = Some(BuildLeft))) } - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { // The following will be repartitioned since ordering should not matter because it will be // adjusted in `EnsureRequirements`. val setting = JoinSetting( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 29a0b5e277274..fbe547f37cde2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,8 +876,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.COALESCE.toString) { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. val testSpec1 = BucketedTableTestSpec( @@ -912,8 +912,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.OFF.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.OFF.toString) { // Bucket read strategy is off. run( BucketedTableTestSpec( @@ -924,11 +924,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { expectedShuffle = true)) } - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => withSQLConf( - SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy, - SQLConf.COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy, + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing/repartitioning buckets is not applied because the ratio of // the number of buckets (3) is greater than max allowed (2). run( @@ -941,9 +941,9 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - Seq(SQLConf.BucketReadStrategyForJoin.COALESCE.toString, - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString).foreach { strategy => - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> strategy) { + Seq(SQLConf.BucketReadStrategyInJoin.COALESCE.toString, + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString).foreach { strategy => + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy) { run( // Coalescing/repartitioning buckets is not applied because the bigger number of // buckets (8) is not divisible by the smaller number of buckets (7). @@ -957,50 +957,52 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("bucket coalescing is applied when join expressions match with partitioning expressions") { + test("bucket coalesce/repartition is applied when join expressions match with partitioning " + + "expressions") { withTable("t1", "t2") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.COALESCE.toString) { - def verify( - query: String, - expectedNumShuffles: Int, - expectedCoalescedNumBuckets: Option[Int]): Unit = { - val plan = sql(query).queryExecution.executedPlan - val shuffles = plan.collect { case s: ShuffleExchangeExec => s } - assert(shuffles.length == expectedNumShuffles) - - val scans = plan.collect { - case f: FileSourceScanExec if f.optionalNewNumBuckets.isDefined => f - } - if (expectedCoalescedNumBuckets.isDefined) { - assert(scans.length == 1) - assert(scans.head.optionalNewNumBuckets == expectedCoalescedNumBuckets) - } else { - assert(scans.isEmpty) + Seq((SQLConf.BucketReadStrategyInJoin.COALESCE, 4), + (SQLConf.BucketReadStrategyInJoin.REPARTITION, 8)).foreach { case (strategy, numBuckets) => + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> strategy.toString) { + def verify( + query: String, + expectedNumShuffles: Int, + expectedNumBuckets: Option[Int]): Unit = { + val plan = sql(query).queryExecution.executedPlan + val shuffles = plan.collect { case s: ShuffleExchangeExec => s } + assert(shuffles.length == expectedNumShuffles) + + val scans = plan.collect { + case f: FileSourceScanExec if f.optionalNewNumBuckets.isDefined => f + } + if (expectedNumBuckets.isDefined) { + assert(scans.length == 1) + assert(scans.head.optionalNewNumBuckets == expectedNumBuckets) + } else { + assert(scans.isEmpty) + } } - } - // Coalescing applied since join expressions match with the bucket columns. - verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, Some(4)) - // Coalescing applied when columns are aliased. - verify( - "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x AND t1.j = y", - 0, - Some(4)) - // Coalescing is not applied when join expressions do not match with bucket columns. - verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) + // Bucket strategy applied since join expressions match with the bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i AND t1.j = t2.j", 0, Some(numBuckets)) + // Bucket strategy applied when columns are aliased. + verify( + "SELECT * FROM t1 JOIN (SELECT i AS x, j AS y FROM t2) ON t1.i = x AND t1.j = y", + 0, + Some(numBuckets)) + // Bucket strategy is not applied when join expressions do not match with bucket columns. + verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) + } } } } test("bucket repartitioning eliminates shuffle") { - withSQLConf(SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString) { + withSQLConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString) { // The side with testSpec2 will be repartitioned to have 8 output partitions. val testSpec1 = BucketedTableTestSpec( Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), @@ -1034,8 +1036,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { val expected = t1.join(t2, t1("i") === t2("i")).collect withSQLConf( - SQLConf.BUCKET_READ_STRATEGY_FOR_JOIN.key -> - SQLConf.BucketReadStrategyForJoin.REPARTITION.toString, + SQLConf.BUCKET_READ_STRATEGY_IN_JOIN.key -> + SQLConf.BucketReadStrategyInJoin.REPARTITION.toString, SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { val t1 = spark.table("t1") From 2c4925ba5258b3728608e77a3fb56250c1c15bbb Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 29 Aug 2020 21:31:29 -0700 Subject: [PATCH 09/10] Address comments --- .../apache/spark/sql/internal/SQLConf.scala | 21 ++++++++++++------- .../sql/execution/DataSourceScanExec.scala | 9 +++++--- .../bucketing/BucketRepartitioningRDD.scala | 14 ++----------- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b24d0ed54fcb..2fa03bed97883 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2660,13 +2660,18 @@ object SQLConf { } val BUCKET_READ_STRATEGY_IN_JOIN = - buildConf("spark.sql.bucketing.bucketReadStrategyInJoin") - .doc("When set to COALESCE, if two bucketed tables with the different number of buckets " + - "are joined, the side with a bigger number of buckets will be coalesced to have the same " + - "number of buckets as the other side. When set to REPARTITION, the side with a bigger " + - "number of buckets will be repartitioned to have the same number of buckets as the other " + - "side. The bigger number of buckets must be divisible by the smaller number of buckets, " + - "and the strategy is applied to sort-merge joins and shuffled hash joins. " + + buildConf("spark.sql.sources.bucketing.readStrategyInJoin") + .doc("The bucket read strategy can be set to one of " + + BucketReadStrategyInJoin.values.mkString(", ") + + s". When set to ${BucketReadStrategyInJoin.COALESCE}, if two bucketed tables with " + + "different number of buckets are joined, the side with a bigger number of buckets will " + + "be coalesced to have the same number of buckets as the smaller side. When set to " + + s"${BucketReadStrategyInJoin.REPARTITION}, the side with a smaller number of buckets " + + "will be repartitioned to have the same number of buckets as the bigger side. For either " + + "coalescing or repartitioning to be applied, The bigger number of buckets must be " + + "divisible by the smaller number of buckets, and the strategy is applied to sort-merge " + + s"joins and shuffled hash joins. By default, the read strategy is set to " + + s"${BucketReadStrategyInJoin.OFF}, and neither coalescing nor reparitioning is applied. " + "Note: Coalescing bucketed table can avoid unnecessary shuffle in join, but it also " + "reduces parallelism and could possibly cause OOM for shuffled hash join. Repartitioning " + "bucketed table avoids unnecessary shuffle in join while maintaining the parallelism " + @@ -2678,7 +2683,7 @@ object SQLConf { .createWithDefault(BucketReadStrategyInJoin.OFF.toString) val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio") + buildConf("spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced/repartitioned should be " + "less than or equal to this value for bucket coalescing/repartitioning to be applied. " + s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_IN_JOIN.key}' " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 7aa318588d018..91158272360a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -180,8 +180,7 @@ case class FileSourceScanExec( } @transient private lazy val isRepartitioningBuckets: Boolean = { - relation.bucketSpec.isDefined && - optionalNewNumBuckets.isDefined && + bucketedScan && optionalNewNumBuckets.isDefined && optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets } @@ -593,7 +592,11 @@ case class FileSourceScanExec( driverMetrics("numFiles") = filesNum driverMetrics("filesSize") = filesSize new BucketRepartitioningRDD( - fsRelation.sparkSession, readFile, filePartitions, bucketSpec, newNumBuckets, output) + fsRelation.sparkSession, + readFile, + filePartitions, + outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression, + output) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala index ad45f3d7d93b5..71f9072b64c4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala @@ -22,9 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{Partition, TaskContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection} import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -35,12 +33,9 @@ private[spark] class BucketRepartitioningRDD( @transient private val sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], @transient override val filePartitions: Seq[FilePartition], - bucketSpec: BucketSpec, - numRepartitionedBuckets: Int, + bucketIdExpression: Expression, output: Seq[Attribute]) extends FileScanRDD(sparkSession, readFunction, filePartitions) { - assert(numRepartitionedBuckets > bucketSpec.numBuckets) - assert(numRepartitionedBuckets % bucketSpec.numBuckets == 0) override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val iter: Iterator[_] = super.compute(split, context) @@ -51,11 +46,6 @@ private[spark] class BucketRepartitioningRDD( } private lazy val getBucketId: InternalRow => Int = { - val bucketIdExpression = { - val bucketColumns = bucketSpec.bucketColumnNames.map(c => output.find(_.name == c).get) - HashPartitioning(bucketColumns, numRepartitionedBuckets).partitionIdExpression - } - val projection = UnsafeProjection.create(Seq(bucketIdExpression), output) row => projection(row).getInt(0) } From 366c9c3b9a8872113d7ccf670b3e8bfbb53f892e Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 29 Aug 2020 21:41:33 -0700 Subject: [PATCH 10/10] PR comment --- .../bucketing/CoalesceOrRepartitionBucketsInJoin.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala index 23423aacf56d0..01508f80471ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyInJoin * - Join keys match with output partition expressions on their respective sides. * - The larger bucket number is divisible by the smaller bucket number. * - The ratio of the number of buckets is less than the value set in - * COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + * BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO. * * The bucketed table with a larger number of buckets is coalesced if BUCKET_READ_STRATEGY_IN_JOIN * is set to BucketReadStrategyInJoin.COALESCE, whereas the bucketed table with a smaller number of