Skip to content

Commit cc57d70

Browse files
committed
[SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (cleanup after SPARK-8317)
SPARK-8317 changed the SQL Exchange operator so that it no longer pushed sorting into Spark's shuffle layer, a change which allowed more efficient SQL-specific sorters to be used. This patch performs some leftover cleanup based on those changes: - Exchange's constructor should no longer accept a `newOrdering` since it's no longer used and no longer works as expected. - `addOperatorsIfNecessary` looked at shuffle input's output ordering to decide whether to sort, but this is the wrong node to be examining: it needs to look at whether the post-shuffle node has the right ordering, since shuffling will not preserve row orderings. Thanks to davies for spotting this. Author: Josh Rosen <[email protected]> Closes apache#7407 from JoshRosen/SPARK-9050 and squashes the following commits: e70be50 [Josh Rosen] No need to wrap line e866494 [Josh Rosen] Refactor addOperatorsIfNecessary to make code clearer 2e467da [Josh Rosen] Remove `newOrdering` from Exchange.
1 parent e965a79 commit cc57d70

File tree

2 files changed

+16
-24
lines changed

2 files changed

+16
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,13 @@ import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEn
3535

3636
/**
3737
* :: DeveloperApi ::
38-
* Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each
39-
* resulting partition based on expressions from the partition key. It is invalid to construct an
40-
* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.
38+
* Performs a shuffle that will result in the desired `newPartitioning`.
4139
*/
4240
@DeveloperApi
43-
case class Exchange(
44-
newPartitioning: Partitioning,
45-
newOrdering: Seq[SortOrder],
46-
child: SparkPlan)
47-
extends UnaryNode {
41+
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
4842

4943
override def outputPartitioning: Partitioning = newPartitioning
5044

51-
override def outputOrdering: Seq[SortOrder] = newOrdering
52-
5345
override def output: Seq[Attribute] = child.output
5446

5547
/**
@@ -279,23 +271,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
279271
partitioning: Partitioning,
280272
rowOrdering: Seq[SortOrder],
281273
child: SparkPlan): SparkPlan = {
282-
val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
283-
val needsShuffle = child.outputPartitioning != partitioning
284274

285-
val withShuffle = if (needsShuffle) {
286-
Exchange(partitioning, Nil, child)
287-
} else {
288-
child
275+
def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
276+
if (child.outputPartitioning != partitioning) {
277+
Exchange(partitioning, child)
278+
} else {
279+
child
280+
}
289281
}
290282

291-
val withSort = if (needSort) {
292-
sqlContext.planner.BasicOperators.getSortOperator(
293-
rowOrdering, global = false, withShuffle)
294-
} else {
295-
withShuffle
283+
def addSortIfNecessary(child: SparkPlan): SparkPlan = {
284+
if (rowOrdering.nonEmpty && child.outputOrdering != rowOrdering) {
285+
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
286+
} else {
287+
child
288+
}
296289
}
297290

298-
withSort
291+
addSortIfNecessary(addShuffleIfNecessary(child))
299292
}
300293

301294
if (meetsRequirements && compatible && !needsAnySort) {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
360360
case logical.OneRowRelation =>
361361
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
362362
case logical.RepartitionByExpression(expressions, child) =>
363-
execution.Exchange(
364-
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
363+
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
365364
case e @ EvaluatePython(udf, child, _) =>
366365
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
367366
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil

0 commit comments

Comments
 (0)