Skip to content

Commit 271c4c6

Browse files
brkyvzrxin
authored andcommitted
[SPARK-7215] made coalesce and repartition a part of the query plan
Coalesce and repartition now show up as part of the query plan, rather than resulting in a new `DataFrame`. cc rxin Author: Burak Yavuz <[email protected]> Closes apache#5762 from brkyvz/df-repartition and squashes the following commits: b1e76dd [Burak Yavuz] added documentation on repartitions 5807e35 [Burak Yavuz] renamed coalescepartitions fa4509f [Burak Yavuz] rename coalesce 2c349b5 [Burak Yavuz] address comments f2e6af1 [Burak Yavuz] add ticks 686c90b [Burak Yavuz] made coalesce and repartition a part of the query plan
1 parent 5ef006f commit 271c4c6

File tree

6 files changed

+40
-13
lines changed

6 files changed

+40
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
310310
override def output: Seq[Attribute] = child.output
311311
}
312312

313+
/**
314+
* Return a new RDD that has exactly `numPartitions` partitions. Differs from
315+
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
316+
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
317+
* of the output requires some specific ordering or distribution of the data.
318+
*/
319+
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
320+
extends UnaryNode {
321+
override def output: Seq[Attribute] = child.output
322+
}
323+
313324
/**
314325
* A relation with one row. This is used in "SELECT ..." without a from clause.
315326
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
3232
case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
3333
extends RedistributeData
3434

35-
case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
35+
/**
36+
* This method repartitions data using [[Expression]]s, and receives information about the
37+
* number of partitions during execution. Used when a specific ordering or distribution is
38+
* expected by the consumer of the query result. Use [[Repartition]] for RDD-like
39+
* `coalesce` and `repartition`.
40+
*/
41+
case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
3642
extends RedistributeData

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -961,9 +961,7 @@ class DataFrame private[sql](
961961
* @group rdd
962962
*/
963963
override def repartition(numPartitions: Int): DataFrame = {
964-
sqlContext.createDataFrame(
965-
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
966-
schema, needsConversion = false)
964+
Repartition(numPartitions, shuffle = true, logicalPlan)
967965
}
968966

969967
/**
@@ -974,10 +972,7 @@ class DataFrame private[sql](
974972
* @group rdd
975973
*/
976974
override def coalesce(numPartitions: Int): DataFrame = {
977-
sqlContext.createDataFrame(
978-
queryExecution.toRdd.coalesce(numPartitions),
979-
schema,
980-
needsConversion = false)
975+
Repartition(numPartitions, shuffle = false, logicalPlan)
981976
}
982977

983978
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
283283
case logical.Distinct(child) =>
284284
execution.Distinct(partial = false,
285285
execution.Distinct(partial = true, planLater(child))) :: Nil
286-
286+
case logical.Repartition(numPartitions, shuffle, child) =>
287+
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
287288
case logical.SortPartitions(sortExprs, child) =>
288289
// This sort only sorts tuples within a partition. Its requiredDistribution will be
289290
// an UnspecifiedDistribution.
@@ -317,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
317318
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
318319
case logical.OneRowRelation =>
319320
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
320-
case logical.Repartition(expressions, child) =>
321+
case logical.RepartitionByExpression(expressions, child) =>
321322
execution.Exchange(
322323
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
323324
case e @ EvaluatePython(udf, child, _) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
245245
}
246246
}
247247

248+
/**
249+
* :: DeveloperApi ::
250+
* Return a new RDD that has exactly `numPartitions` partitions.
251+
*/
252+
@DeveloperApi
253+
case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
254+
extends UnaryNode {
255+
override def output: Seq[Attribute] = child.output
256+
257+
override def execute(): RDD[Row] = {
258+
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
259+
}
260+
}
261+
248262

249263
/**
250264
* :: DeveloperApi ::

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
783783
case (None, Some(perPartitionOrdering), None, None) =>
784784
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
785785
case (None, None, Some(partitionExprs), None) =>
786-
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
786+
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
787787
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
788788
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
789-
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
789+
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
790790
case (None, None, None, Some(clusterExprs)) =>
791791
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
792-
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
792+
RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
793793
case (None, None, None, None) => withHaving
794794
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
795795
}

0 commit comments

Comments
 (0)