From 97cb91dd21c13bc7e5847730e45d20d37705038d Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 19 Dec 2019 11:20:28 +0800 Subject: [PATCH 1/8] [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by Change-Id: I9ec887eece29abed048192b559f7d69a9e67afe3 --- .../exchange/EnsureRequirements.scala | 4 ++++ .../spark/sql/execution/PlannerSuite.scala | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 068e0164443dd..77a8b305251f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -55,6 +55,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) + case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _), + distribution: OrderedDistribution) => + ShuffleExchangeExec( + distribution.createPartitioning(partitioning.numPartitions), child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(defaultNumPreShufflePartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3dea0b1ce937c..129ab79454762 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -421,6 +421,24 @@ class PlannerSuite extends SharedSparkSession { } } + test("EnsureRequirements replace Exchange " + + "if child has SortExec and RoundRobinPartitioning") { + val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) + val partitioning = RoundRobinPartitioning(5) + assert(!partitioning.satisfies(distribution)) + + val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil, + global = true, + child = ShuffleExchangeExec( + partitioning, + DummySparkPlan(outputPartitioning = partitioning))) + val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) + assert(outputPlan.find{ + case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning] + case _ => false}.isEmpty, + "RoundRobinPartitioning should be changed to RangePartitioning") + } + test("EnsureRequirements does not eliminate Exchange with different partitioning") { val distribution = ClusteredDistribution(Literal(1) :: Nil) val partitioning = HashPartitioning(Literal(2) :: Nil, 5) From 4c9b6b124e5897868a9210c7b457d63ff57c7b63 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Thu, 19 Dec 2019 12:48:20 +0800 Subject: [PATCH 2/8] add issue number in test message Change-Id: If6b4c1f818c38b1862f69acc63f79feea127bbee --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 129ab79454762..e0803adadecdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -421,7 +421,7 @@ class PlannerSuite extends SharedSparkSession { } } - test("EnsureRequirements replace Exchange " + + test("SPARK-30036: EnsureRequirements replace Exchange " + "if child has SortExec and RoundRobinPartitioning") { val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) val partitioning = RoundRobinPartitioning(5) From 02267db3f32b8a7a4f6333a8d73b4b74598a8b4d Mon Sep 17 00:00:00 2001 From: lijunqing Date: Thu, 19 Dec 2019 16:17:24 +0800 Subject: [PATCH 3/8] fix suite tests Change-Id: Ieb757a218588e2f35efd1b0eac4d076fb75eb1c8 --- .../scala/org/apache/spark/sql/ConfigBehaviorSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala index 56ae904e83fdb..5887a22ec7aef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 10000 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. + val data = spark.range(0, n, 1, 10).sort($"id".desc) .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect() // Compute histogram for the number of records per partition post sort @@ -55,12 +54,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) { // The default chi-sq value should be low - assert(computeChiSquareTest() < 100) + assert(computeChiSquareTest() < 10) withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") { // If we only sample one point, the range boundaries will be pretty bad and the // chi-sq value would be very high. - assert(computeChiSquareTest() > 300) + assert(computeChiSquareTest() > 100) } } } From 56a1101e63bfd124c94409cadadc6edd650661e2 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Fri, 20 Dec 2019 11:11:33 +0800 Subject: [PATCH 4/8] suite repartition(int, Seq[expression]) Change-Id: I6b102f32b4084625875b395990e8ac4673c56bac --- .../exchange/EnsureRequirements.scala | 6 ++--- .../spark/sql/execution/PlannerSuite.scala | 26 ++++++++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 77a8b305251f2..033404ccac44d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -55,10 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) - case (ShuffleExchangeExec(partitioning: RoundRobinPartitioning, child, _), - distribution: OrderedDistribution) => - ShuffleExchangeExec( - distribution.createPartitioning(partitioning.numPartitions), child) + case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) => + ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(defaultNumPreShufflePartitions) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index e0803adadecdb..728b1fd8ce3a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -421,8 +421,8 @@ class PlannerSuite extends SharedSparkSession { } } - test("SPARK-30036: EnsureRequirements replace Exchange " + - "if child has SortExec and RoundRobinPartitioning") { + test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " + + "if SortExec is followed by RoundRobinPartitioning") { val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) val partitioning = RoundRobinPartitioning(5) assert(!partitioning.satisfies(distribution)) @@ -433,12 +433,30 @@ class PlannerSuite extends SharedSparkSession { partitioning, DummySparkPlan(outputPartitioning = partitioning))) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) - assert(outputPlan.find{ - case e: ShuffleExchangeExec => e.outputPartitioning.isInstanceOf[RoundRobinPartitioning] + assert(outputPlan.find { + case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true case _ => false}.isEmpty, "RoundRobinPartitioning should be changed to RangePartitioning") } + test("SPARK-30036: Romove unnecessary HashPartitioning " + + "if SortExec is followed by HashPartitioning") { + val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) + val partitioning = HashPartitioning(Literal(1) :: Nil, 5) + assert(!partitioning.satisfies(distribution)) + + val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil, + global = true, + child = ShuffleExchangeExec( + partitioning, + DummySparkPlan(outputPartitioning = partitioning))) + val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) + assert(outputPlan.find { + case ShuffleExchangeExec(_: HashPartitioning, _, _) => true + case _ => false}.isEmpty, + "HashPartitioning should be changed to RangePartitioning") + } + test("EnsureRequirements does not eliminate Exchange with different partitioning") { val distribution = ClusteredDistribution(Literal(1) :: Nil) val partitioning = HashPartitioning(Literal(2) :: Nil, 5) From 5915a124d1cba718c15bf861e60fd7c4b4dee472 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Sat, 21 Dec 2019 00:01:53 +0800 Subject: [PATCH 5/8] change code style Change-Id: I0b55a61e1a9ac3555177322ac44d2b216d45bd24 --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 728b1fd8ce3a2..6a06ea580e1e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -435,7 +435,8 @@ class PlannerSuite extends SharedSparkSession { val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assert(outputPlan.find { case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true - case _ => false}.isEmpty, + case _ => false + }.isEmpty, "RoundRobinPartitioning should be changed to RangePartitioning") } @@ -453,7 +454,8 @@ class PlannerSuite extends SharedSparkSession { val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assert(outputPlan.find { case ShuffleExchangeExec(_: HashPartitioning, _, _) => true - case _ => false}.isEmpty, + case _ => false + }.isEmpty, "HashPartitioning should be changed to RangePartitioning") } From fa03fcbbb08c1cdcd3f25bc1b8fb03d7a8535cf0 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Tue, 24 Dec 2019 09:45:23 +0800 Subject: [PATCH 6/8] add end-to-end unit tests Change-Id: I863524b569d8838acb862747cbb6b96ae9bf3bd9 --- .../apache/spark/sql/ConfigBehaviorSuite.scala | 1 - .../spark/sql/execution/PlannerSuite.scala | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala index 5887a22ec7aef..0e090c6772d41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala @@ -39,7 +39,6 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 10000 // Trigger a sort - // Range has range partitioning in its output now. val data = spark.range(0, n, 1, 10).sort($"id".desc) .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 6a06ea580e1e9..36e36f9ee8388 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -438,6 +438,14 @@ class PlannerSuite extends SharedSparkSession { case _ => false }.isEmpty, "RoundRobinPartitioning should be changed to RangePartitioning") + + val query = testData.select('key, 'value).repartition(2).sort('key.asc) + assert(query.queryExecution.sparkPlan.collectLeaves().count{ + case _: ShuffleExchangeExec => true + case _ => false + } == 1) + assert(query.rdd.getNumPartitions == 2) + assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50)) } test("SPARK-30036: Romove unnecessary HashPartitioning " + @@ -457,6 +465,14 @@ class PlannerSuite extends SharedSparkSession { case _ => false }.isEmpty, "HashPartitioning should be changed to RangePartitioning") + + val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc) + assert(query.queryExecution.sparkPlan.collectLeaves().count{ + case _: ShuffleExchangeExec => true + case _ => false + } == 1) + assert(query.rdd.getNumPartitions == 5) + assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20)) } test("EnsureRequirements does not eliminate Exchange with different partitioning") { From 52ce6603aa4e3695360b54f2481bd9a0f9142016 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Tue, 24 Dec 2019 13:09:31 +0800 Subject: [PATCH 7/8] fix unit test Change-Id: I19b2f5d7cb35e8482ec7b92b79ca29294a6e3029 --- .../org/apache/spark/sql/execution/PlannerSuite.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 36e36f9ee8388..4e4b3eae03594 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -440,10 +440,6 @@ class PlannerSuite extends SharedSparkSession { "RoundRobinPartitioning should be changed to RangePartitioning") val query = testData.select('key, 'value).repartition(2).sort('key.asc) - assert(query.queryExecution.sparkPlan.collectLeaves().count{ - case _: ShuffleExchangeExec => true - case _ => false - } == 1) assert(query.rdd.getNumPartitions == 2) assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50)) } @@ -467,10 +463,6 @@ class PlannerSuite extends SharedSparkSession { "HashPartitioning should be changed to RangePartitioning") val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc) - assert(query.queryExecution.sparkPlan.collectLeaves().count{ - case _: ShuffleExchangeExec => true - case _ => false - } == 1) assert(query.rdd.getNumPartitions == 5) assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20)) } From d2615b61a6e5f7ed849f826d25d45315953386c9 Mon Sep 17 00:00:00 2001 From: lijunqing Date: Tue, 24 Dec 2019 16:47:07 +0800 Subject: [PATCH 8/8] change comments Change-Id: I175b3824ba9ce46fba0ebba6ebf0b220d64de42c --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4e4b3eae03594..017e548809413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -421,7 +421,7 @@ class PlannerSuite extends SharedSparkSession { } } - test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " + + test("SPARK-30036: Remove unnecessary RoundRobinPartitioning " + "if SortExec is followed by RoundRobinPartitioning") { val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) val partitioning = RoundRobinPartitioning(5) @@ -444,7 +444,7 @@ class PlannerSuite extends SharedSparkSession { assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50)) } - test("SPARK-30036: Romove unnecessary HashPartitioning " + + test("SPARK-30036: Remove unnecessary HashPartitioning " + "if SortExec is followed by HashPartitioning") { val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil) val partitioning = HashPartitioning(Literal(1) :: Nil, 5)