Skip to content

Commit 81f57fe

Browse files
jiangxb1987bersprockets
authored andcommitted
[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.repartition(1)
In `ShuffleExchangeExec`, we don't need to insert extra local sort before round-robin partitioning, if the new partitioning has only 1 partition, because under that case all output rows go to the same partition. The existing test cases. Author: Xingbo Jiang <[email protected]> Closes #20426 from jiangxb1987/repartition1.
1 parent 8d2d558 commit 81f57fe

File tree

2 files changed

+5
-1
lines changed

2 files changed

+5
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,11 @@ object ShuffleExchange {
253253
//
254254
// Currently we following the most straight-forward way that perform a local sort before
255255
// partitioning.
256+
//
257+
// Note that we don't perform local sort if the new partitioning has only 1 partition, under
258+
// that case all output rows go to the same partition.
256259
val newRdd = if (SparkEnv.get.conf.get(SQLConf.SORT_BEFORE_REPARTITION) &&
260+
newPartitioning.numPartitions > 1 &&
257261
newPartitioning.isInstanceOf[RoundRobinPartitioning]) {
258262
rdd.mapPartitionsInternal { iter =>
259263
val recordComparatorSupplier = new Supplier[RecordComparator] {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
162162
val allEvents = ForeachSinkSuite.allEvents()
163163
assert(allEvents.size === 1)
164164
assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
165-
assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 2))
165+
assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
166166

167167
// `close` should be called with the error
168168
val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]

0 commit comments

Comments
 (0)