Skip to content

Commit 2e08778

Browse files
committed
address comments
1 parent 028b0ac commit 2e08778

File tree

4 files changed

+31
-29
lines changed

4 files changed

+31
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,23 +278,24 @@ object SQLConf {
278278
val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
279279
.doc("When true, enable adaptive query execution.")
280280
.booleanConf
281-
.createWithDefault(true)
281+
.createWithDefault(false)
282282

283283
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
284284
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
285285
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
286286
.intConf
287-
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
287+
.checkValue(_ > 0, "The minimum shuffle partition number " +
288288
"must be a positive integer.")
289289
.createWithDefault(1)
290290

291291
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
292292
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
293-
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
293+
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
294+
"The by default equals to spark.sql.shuffle.partitions")
294295
.intConf
295-
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
296+
.checkValue(_ > 0, "The maximum shuffle partition number " +
296297
"must be a positive integer.")
297-
.createWithDefault(500)
298+
.createOptional
298299

299300
val SUBEXPRESSION_ELIMINATION_ENABLED =
300301
buildConf("spark.sql.subexpressionElimination.enabled")
@@ -1735,7 +1736,8 @@ class SQLConf extends Serializable with Logging {
17351736

17361737
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
17371738

1738-
def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)
1739+
def maxNumPostShufflePartitions: Int =
1740+
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
17391741

17401742
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
17411743

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.exchange._
3636
* There are 2 kinds of query stages:
3737
* 1. Shuffle query stage. This stage materializes its output to shuffle files, and Spark launches
3838
* another job to execute the further operators.
39-
* 2. Broadcast stage. This stage materializes its output to an array in driver JVM. Spark
39+
* 2. Broadcast query stage. This stage materializes its output to an array in driver JVM. Spark
4040
* broadcasts the array before executing the further operators.
4141
*/
4242
abstract class QueryStageExec extends LeafExecNode {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
3434
*
3535
* When one query stage finishes materialization, a list of adaptive optimizer rules will be
3636
* executed, trying to optimize the query plan with the data statistics collected from the the
37-
* materialized data. Then we travers the query plan again and try to insert more query stages.
37+
* materialized data. Then we traverse the query plan again and try to insert more query stages.
3838
*
3939
* To create query stages, we traverse the query tree bottom up. When we hit an exchange node,
4040
* and all the child query stages of this exchange node are materialized, we create a new

sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -527,47 +527,47 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
527527
spark.sql("SET spark.sql.exchange.reuse=true")
528528
val df = spark.range(1).selectExpr("id AS key", "id AS value")
529529

530-
// test case 1: a fragment has 3 child fragments but they are the same fragment.
531-
// ResultQueryFragment 1
532-
// ShuffleQueryFragment 0
533-
// ReusedQueryFragment 0
534-
// ReusedQueryFragment 0
530+
// test case 1: a query stage has 3 child stages but they are the same stage.
531+
// ResultQueryStage 1
532+
// ShuffleQueryStage 0
533+
// ReusedQueryStage 0
534+
// ReusedQueryStage 0
535535
val resultDf = df.join(df, "key").join(df, "key")
536536
val finalPlan = resultDf.queryExecution.executedPlan
537537
.asInstanceOf[AdaptiveSparkPlanExec].finalPlan
538538
assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2)
539539
assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3)
540540
checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
541541

542-
// test case 2: a fragment has 2 parent fragments.
543-
// ResultQueryFragment 3
544-
// ShuffleQueryFragment 1
545-
// ShuffleQueryFragment 0
546-
// ShuffleQueryFragment 2
547-
// ReusedQueryFragment 0
542+
// test case 2: a query stage has 2 parent stages.
543+
// ResultQueryStage 3
544+
// ShuffleQueryStage 1
545+
// ShuffleQueryStage 0
546+
// ShuffleQueryStage 2
547+
// ReusedQueryStage 0
548548
val grouped = df.groupBy("key").agg(max("value").as("value"))
549549
val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
550550
.union(grouped.groupBy(col("key") + 2).max("value"))
551551

552552
val finalPlan2 = resultDf2.queryExecution.executedPlan
553553
.asInstanceOf[AdaptiveSparkPlanExec].finalPlan
554554

555-
// The result fragment has 2 children
556-
val level1Fragments = finalPlan2.collect { case q: QueryStageExec => q }
557-
assert(level1Fragments.length == 2)
555+
// The result stage has 2 children
556+
val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
557+
assert(level1Stages.length == 2)
558558

559-
val leafFragments = level1Fragments.flatMap { fragment =>
560-
// All of the child fragments of result fragment have only one child fragment.
561-
val children = fragment.plan.collect { case q: QueryStageExec => q }
559+
val leafStages = level1Stages.flatMap { stage =>
560+
// All of the child stages of result stage have only one child stage.
561+
val children = stage.plan.collect { case q: QueryStageExec => q }
562562
assert(children.length == 1)
563563
children
564564
}
565-
assert(leafFragments.length == 2)
565+
assert(leafStages.length == 2)
566566

567-
val reusedFragments = level1Fragments.flatMap { fragment =>
568-
fragment.plan.collect { case r: ReusedQueryStageExec => r }
567+
val reusedStages = level1Stages.flatMap { stage =>
568+
stage.plan.collect { case r: ReusedQueryStageExec => r }
569569
}
570-
assert(reusedFragments.length == 1)
570+
assert(reusedStages.length == 1)
571571

572572
checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
573573
}

0 commit comments

Comments
 (0)