Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,17 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* Return a new RDD that has exactly `numPartitions` partitions. Differs from
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode {
case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
extends RedistributeData

case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
/**
* This method repartitions data using [[Expression]]s, and receives information about the
* number of partitions during execution. Used when a specific ordering or distribution is
* expected by the consumer of the query result. Use [[Repartition]] for RDD-like
* `coalesce` and `repartition`.
*/
case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
extends RedistributeData
9 changes: 2 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,9 +961,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def repartition(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
schema, needsConversion = false)
Repartition(numPartitions, shuffle = true, logicalPlan)
}

/**
Expand All @@ -974,10 +972,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.coalesce(numPartitions),
schema,
needsConversion = false)
Repartition(numPartitions, shuffle = false, logicalPlan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.Repartition(numPartitions, shuffle, child) =>
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
Expand Down Expand Up @@ -317,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
case logical.OneRowRelation =>
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
case logical.RepartitionByExpression(expressions, child) =>
execution.Exchange(
HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil
case e @ EvaluatePython(udf, child, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
}
}

/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
*/
@DeveloperApi
case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def execute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}


/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case (None, Some(perPartitionOrdering), None, None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
case (None, None, Some(partitionExprs), None) =>
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)
case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, Some(clusterExprs)) =>
Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving))
case (None, None, None, None) => withHaving
case _ => sys.error("Unsupported set of ordering / distribution clauses.")
}
Expand Down