Skip to content

Commit 33212cb

Browse files
JoshRosenAndrew Or
authored andcommitted
[SPARK-13168][SQL] Collapse adjacent repartition operators
Spark SQL should collapse adjacent `Repartition` operators and only keep the last one. Author: Josh Rosen <[email protected]> Closes #11064 from JoshRosen/collapse-repartition.
1 parent 085f510 commit 33212cb

File tree

6 files changed

+33
-10
lines changed

6 files changed

+33
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
6868
PushPredicateThroughAggregate,
6969
ColumnPruning,
7070
// Operator combine
71-
ProjectCollapsing,
71+
CollapseRepartition,
72+
CollapseProject,
7273
CombineFilters,
7374
CombineLimits,
7475
CombineUnions,
@@ -322,7 +323,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
322323
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
323324
* merging the expressions into one single expression.
324325
*/
325-
object ProjectCollapsing extends Rule[LogicalPlan] {
326+
object CollapseProject extends Rule[LogicalPlan] {
326327

327328
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
328329
case p @ Project(projectList1, Project(projectList2, child)) =>
@@ -390,6 +391,16 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
390391
}
391392
}
392393

394+
/**
395+
* Combines adjacent [[Repartition]] operators by keeping only the last one.
396+
*/
397+
object CollapseRepartition extends Rule[LogicalPlan] {
398+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
399+
case r @ Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
400+
Repartition(numPartitions, shuffle, child)
401+
}
402+
}
403+
393404
/**
394405
* Simplifies LIKE expressions that do not need full regular expressions to evaluate the condition.
395406
* For example, when the expression is just checking to see if a string starts with a given
@@ -857,6 +868,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
857868
/**
858869
* Splits join condition expressions into three categories based on the attributes required
859870
* to evaluate them.
871+
*
860872
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
861873
*/
862874
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2525
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2626
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2727

28-
class ProjectCollapsingSuite extends PlanTest {
28+
class CollapseProjectSuite extends PlanTest {
2929
object Optimize extends RuleExecutor[LogicalPlan] {
3030
val batches =
3131
Batch("Subqueries", FixedPoint(10), EliminateSubQueries) ::
32-
Batch("ProjectCollapsing", Once, ProjectCollapsing) :: Nil
32+
Batch("CollapseProject", Once, CollapseProject) :: Nil
3333
}
3434

3535
val testRelation = LocalRelation('a.int, 'b.int)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class FilterPushdownSuite extends PlanTest {
4242
PushPredicateThroughGenerate,
4343
PushPredicateThroughAggregate,
4444
ColumnPruning,
45-
ProjectCollapsing) :: Nil
45+
CollapseProject) :: Nil
4646
}
4747

4848
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class JoinOrderSuite extends PlanTest {
4343
PushPredicateThroughGenerate,
4444
PushPredicateThroughAggregate,
4545
ColumnPruning,
46-
ProjectCollapsing) :: Nil
46+
CollapseProject) :: Nil
4747

4848
}
4949

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.{execution, Row, SQLConf}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
24-
import org.apache.spark.sql.catalyst.plans._
25-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
2625
import org.apache.spark.sql.catalyst.plans.physical._
2726
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
2827
import org.apache.spark.sql.functions._
@@ -223,6 +222,18 @@ class PlannerSuite extends SharedSQLContext {
223222
}
224223
}
225224

225+
test("collapse adjacent repartitions") {
226+
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
227+
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
228+
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
229+
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
230+
doubleRepartitioned.queryExecution.optimizedPlan match {
231+
case r: Repartition =>
232+
assert(r.numPartitions === 5)
233+
assert(r.shuffle === false)
234+
}
235+
}
236+
226237
// --- Unit tests of EnsureRequirements ---------------------------------------------------------
227238

228239
// When it comes to testing whether EnsureRequirements properly ensures distribution requirements,

sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
2323
import org.apache.spark.sql.{DataFrame, SQLContext}
2424
import org.apache.spark.sql.catalyst.TableIdentifier
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
26-
import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
26+
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
2929
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -188,7 +188,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
188188
// The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
189189
// `Aggregate`s to perform type casting. This rule merges these `Project`s into
190190
// `Aggregate`s.
191-
ProjectCollapsing,
191+
CollapseProject,
192192

193193
// Used to handle other auxiliary `Project`s added by analyzer (e.g.
194194
// `ResolveAggregateFunctions` rule)

0 commit comments

Comments
 (0)