From 686c90b5ef711369deb7d78d8ac68f417a885664 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 16:49:09 -0700 Subject: [PATCH 1/6] made coalesce and repartition a part of the query plan --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 3 ++- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 ++++-- .../sql/catalyst/plans/logical/basicOperators.scala | 4 ++++ .../main/scala/org/apache/spark/sql/DataFrame.scala | 10 +++------- .../spark/sql/execution/SparkStrategies.scala | 3 ++- .../apache/spark/sql/execution/basicOperators.scala | 13 +++++++++++++ 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 0af969cc5cc6..e0734b2df6d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -308,7 +308,8 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { { case s ~ p => Substring(s, p, Literal(Integer.MAX_VALUE)) } | (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^ { case s ~ p ~ l => Substring(s, p, l) } - | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => Coalesce(exprs) } + | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => + expressions.Coalesce(exprs) } | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2d03fbfb0d31..984c15d698cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.expressions + import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions._ @@ -234,7 +236,7 @@ object NullPropagation extends Rule[LogicalPlan] { case e @ Count(expr) if !expr.nullable => Count(Literal(1)) // For Coalesce, remove null literals. - case e @ Coalesce(children) => + case e @ expressions.Coalesce(children) => val newChildren = children.filter { case Literal(null, _) => false case _ => true @@ -244,7 +246,7 @@ object NullPropagation extends Rule[LogicalPlan] { } else if (newChildren.length == 1) { newChildren(0) } else { - Coalesce(newChildren) + expressions.Coalesce(newChildren) } case e @ Substring(Literal(null, _), _, _) => Literal.create(null, e.dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index bbc94a7ab339..4ba3e6db7ac7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -310,6 +310,10 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } +case class Coalesce(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index ca6ae482eb2a..7da44878649b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, S import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} +import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails @@ -961,9 +962,7 @@ class DataFrame private[sql]( * @group rdd */ override def repartition(numPartitions: Int): DataFrame = { - sqlContext.createDataFrame( - queryExecution.toRdd.map(_.copy()).repartition(numPartitions), - schema, needsConversion = false) + logical.Coalesce(numPartitions, shuffle = true, logicalPlan) } /** @@ -974,10 +973,7 @@ class DataFrame private[sql]( * @group rdd */ override def coalesce(numPartitions: Int): DataFrame = { - sqlContext.createDataFrame( - queryExecution.toRdd.coalesce(numPartitions), - schema, - needsConversion = false) + logical.Coalesce(numPartitions, shuffle = false, logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 030ef118f75d..8571496d9b23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil - + case logical.Coalesce(numPartitions, shuffle, child) => + execution.Coalesce(numPartitions, shuffle, planLater(child)) :: Nil case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d286fe81bee5..4ffa02ee4f80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -245,6 +245,19 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { } } +/** + * :: DeveloperApi :: + * Return a new RDD that has exactly numPartitions partitions. + */ +@DeveloperApi +case class Coalesce(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override def execute(): RDD[Row] = { + child.execute().map(_.copy()).coalesce(numPartitions, shuffle) + } +} + /** * :: DeveloperApi :: From f2e6af1eece747ba9f6b067c1b0aebf3646e6abd Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 16:52:22 -0700 Subject: [PATCH 2/6] add ticks --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4ffa02ee4f80..242591eeedeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -247,7 +247,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { /** * :: DeveloperApi :: - * Return a new RDD that has exactly numPartitions partitions. + * Return a new RDD that has exactly `numPartitions` partitions. */ @DeveloperApi case class Coalesce(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { From 2c349b5a68c555b53fda74fd1bb489519d06a90c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 16:59:52 -0700 Subject: [PATCH 3/6] address comments --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 984c15d698cf..baeee000dc56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.expressions - import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.FullOuter From fa4509f8eb92ed974d9d18e4f7e0f113df64156a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 19:10:56 -0700 Subject: [PATCH 4/6] rename coalesce --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 3 +-- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 3 ++- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 5 ++--- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../org/apache/spark/sql/execution/basicOperators.scala | 3 ++- 6 files changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index e0734b2df6d1..0af969cc5cc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -308,8 +308,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser { { case s ~ p => Substring(s, p, Literal(Integer.MAX_VALUE)) } | (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^ { case s ~ p ~ l => Substring(s, p, l) } - | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => - expressions.Coalesce(exprs) } + | COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => Coalesce(exprs) } | SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } | ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } | ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index baeee000dc56..2d03fbfb0d31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.FullOuter @@ -235,7 +234,7 @@ object NullPropagation extends Rule[LogicalPlan] { case e @ Count(expr) if !expr.nullable => Count(Literal(1)) // For Coalesce, remove null literals. - case e @ expressions.Coalesce(children) => + case e @ Coalesce(children) => val newChildren = children.filter { case Literal(null, _) => false case _ => true @@ -245,7 +244,7 @@ object NullPropagation extends Rule[LogicalPlan] { } else if (newChildren.length == 1) { newChildren(0) } else { - expressions.Coalesce(newChildren) + Coalesce(newChildren) } case e @ Substring(Literal(null, _), _, _) => Literal.create(null, e.dataType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 4ba3e6db7ac7..9622e9c5e1c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -310,7 +310,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } -case class Coalesce(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { +case class CoalescePartitions(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) + extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 7da44878649b..01a864d1fa23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, S import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} import org.apache.spark.sql.jdbc.JDBCWriteDetails @@ -962,7 +961,7 @@ class DataFrame private[sql]( * @group rdd */ override def repartition(numPartitions: Int): DataFrame = { - logical.Coalesce(numPartitions, shuffle = true, logicalPlan) + CoalescePartitions(numPartitions, shuffle = true, logicalPlan) } /** @@ -973,7 +972,7 @@ class DataFrame private[sql]( * @group rdd */ override def coalesce(numPartitions: Int): DataFrame = { - logical.Coalesce(numPartitions, shuffle = false, logicalPlan) + CoalescePartitions(numPartitions, shuffle = false, logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8571496d9b23..29d8983397ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -283,8 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil - case logical.Coalesce(numPartitions, shuffle, child) => - execution.Coalesce(numPartitions, shuffle, planLater(child)) :: Nil + case logical.CoalescePartitions(numPartitions, shuffle, child) => + execution.CoalescePartitions(numPartitions, shuffle, planLater(child)) :: Nil case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 242591eeedeb..50a8babe2419 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -250,7 +250,8 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { * Return a new RDD that has exactly `numPartitions` partitions. */ @DeveloperApi -case class Coalesce(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { +case class CoalescePartitions(numPartitions: Int, shuffle: Boolean, child: SparkPlan) + extends UnaryNode { override def output: Seq[Attribute] = child.output override def execute(): RDD[Row] = { From 5807e353a5a6f629b168dc210efec2ee47fa53e9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 19:48:23 -0700 Subject: [PATCH 5/6] renamed coalescepartitions --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- .../spark/sql/catalyst/plans/logical/partitioning.scala | 2 +- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 4 ++-- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 +++--- .../org/apache/spark/sql/execution/basicOperators.scala | 2 +- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 9622e9c5e1c1..14a08990ed07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -310,7 +310,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } -case class CoalescePartitions(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) +case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index e737418d9c3b..f34b9e15c3a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -32,5 +32,5 @@ abstract class RedistributeData extends UnaryNode { case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) extends RedistributeData -case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan) +case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan) extends RedistributeData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 01a864d1fa23..2affba7d42cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -961,7 +961,7 @@ class DataFrame private[sql]( * @group rdd */ override def repartition(numPartitions: Int): DataFrame = { - CoalescePartitions(numPartitions, shuffle = true, logicalPlan) + Repartition(numPartitions, shuffle = true, logicalPlan) } /** @@ -972,7 +972,7 @@ class DataFrame private[sql]( * @group rdd */ override def coalesce(numPartitions: Int): DataFrame = { - CoalescePartitions(numPartitions, shuffle = false, logicalPlan) + Repartition(numPartitions, shuffle = false, logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 29d8983397ab..3a0a6c86700a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -283,8 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil - case logical.CoalescePartitions(numPartitions, shuffle, child) => - execution.CoalescePartitions(numPartitions, shuffle, planLater(child)) :: Nil + case logical.Repartition(numPartitions, shuffle, child) => + execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. @@ -318,7 +318,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.PhysicalRDD(Nil, singleRowRdd) :: Nil - case logical.Repartition(expressions, child) => + case logical.RepartitionByExpression(expressions, child) => execution.Exchange( HashPartitioning(expressions, numPartitions), Nil, planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 50a8babe2419..1afdb409417c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -250,7 +250,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { * Return a new RDD that has exactly `numPartitions` partitions. */ @DeveloperApi -case class CoalescePartitions(numPartitions: Int, shuffle: Boolean, child: SparkPlan) +case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 0ea6d57b816c..2dc6463abafa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -783,13 +783,13 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case (None, Some(perPartitionOrdering), None, None) => Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving) case (None, None, Some(partitionExprs), None) => - Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving) + RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving) case (None, Some(perPartitionOrdering), Some(partitionExprs), None) => Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, - Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)) + RepartitionByExpression(partitionExprs.getChildren.map(nodeToExpr), withHaving)) case (None, None, None, Some(clusterExprs)) => Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false, - Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving)) + RepartitionByExpression(clusterExprs.getChildren.map(nodeToExpr), withHaving)) case (None, None, None, None) => withHaving case _ => sys.error("Unsupported set of ordering / distribution clauses.") } From b1e76dda3f8cee8010af95fd7bc62d1eb7993128 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 28 Apr 2015 20:59:22 -0700 Subject: [PATCH 6/6] added documentation on repartitions --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 6 ++++++ .../spark/sql/catalyst/plans/logical/partitioning.scala | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 14a08990ed07..608e272da778 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -310,6 +310,12 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } +/** + * Return a new RDD that has exactly `numPartitions` partitions. Differs from + * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user + * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer + * of the output requires some specific ordering or distribution of the data. + */ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index f34b9e15c3a2..63df2c1ee72f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -32,5 +32,11 @@ abstract class RedistributeData extends UnaryNode { case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) extends RedistributeData +/** + * This method repartitions data using [[Expression]]s, and receives information about the + * number of partitions during execution. Used when a specific ordering or distribution is + * expected by the consumer of the query result. Use [[Repartition]] for RDD-like + * `coalesce` and `repartition`. + */ case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan) extends RedistributeData