Skip to content

Commit 686c90b

Browse files
committed
made coalesce and repartition a part of the query plan
1 parent b14cd23 commit 686c90b

File tree

6 files changed

+28
-11
lines changed

6 files changed

+28
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,8 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
308308
{ case s ~ p => Substring(s, p, Literal(Integer.MAX_VALUE)) }
309309
| (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^
310310
{ case s ~ p ~ l => Substring(s, p, l) }
311-
| COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => Coalesce(exprs) }
311+
| COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs =>
312+
expressions.Coalesce(exprs) }
312313
| SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) }
313314
| ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) }
314315
| ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.expressions
21+
2022
import scala.collection.immutable.HashSet
2123
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
2224
import org.apache.spark.sql.catalyst.expressions._
@@ -234,7 +236,7 @@ object NullPropagation extends Rule[LogicalPlan] {
234236
case e @ Count(expr) if !expr.nullable => Count(Literal(1))
235237

236238
// For Coalesce, remove null literals.
237-
case e @ Coalesce(children) =>
239+
case e @ expressions.Coalesce(children) =>
238240
val newChildren = children.filter {
239241
case Literal(null, _) => false
240242
case _ => true
@@ -244,7 +246,7 @@ object NullPropagation extends Rule[LogicalPlan] {
244246
} else if (newChildren.length == 1) {
245247
newChildren(0)
246248
} else {
247-
Coalesce(newChildren)
249+
expressions.Coalesce(newChildren)
248250
}
249251

250252
case e @ Substring(Literal(null, _), _, _) => Literal.create(null, e.dataType)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
310310
override def output: Seq[Attribute] = child.output
311311
}
312312

313+
case class Coalesce(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode {
314+
override def output: Seq[Attribute] = child.output
315+
}
316+
313317
/**
314318
* A relation with one row. This is used in "SELECT ..." without a from clause.
315319
*/

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, S
3737
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar}
3838
import org.apache.spark.sql.catalyst.expressions._
3939
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
40+
import org.apache.spark.sql.catalyst.plans.logical
4041
import org.apache.spark.sql.catalyst.plans.logical._
4142
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4243
import org.apache.spark.sql.jdbc.JDBCWriteDetails
@@ -961,9 +962,7 @@ class DataFrame private[sql](
961962
* @group rdd
962963
*/
963964
override def repartition(numPartitions: Int): DataFrame = {
964-
sqlContext.createDataFrame(
965-
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
966-
schema, needsConversion = false)
965+
logical.Coalesce(numPartitions, shuffle = true, logicalPlan)
967966
}
968967

969968
/**
@@ -974,10 +973,7 @@ class DataFrame private[sql](
974973
* @group rdd
975974
*/
976975
override def coalesce(numPartitions: Int): DataFrame = {
977-
sqlContext.createDataFrame(
978-
queryExecution.toRdd.coalesce(numPartitions),
979-
schema,
980-
needsConversion = false)
976+
logical.Coalesce(numPartitions, shuffle = false, logicalPlan)
981977
}
982978

983979
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
283283
case logical.Distinct(child) =>
284284
execution.Distinct(partial = false,
285285
execution.Distinct(partial = true, planLater(child))) :: Nil
286-
286+
case logical.Coalesce(numPartitions, shuffle, child) =>
287+
execution.Coalesce(numPartitions, shuffle, planLater(child)) :: Nil
287288
case logical.SortPartitions(sortExprs, child) =>
288289
// This sort only sorts tuples within a partition. Its requiredDistribution will be
289290
// an UnspecifiedDistribution.

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,19 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
245245
}
246246
}
247247

248+
/**
249+
* :: DeveloperApi ::
250+
* Return a new RDD that has exactly numPartitions partitions.
251+
*/
252+
@DeveloperApi
253+
case class Coalesce(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode {
254+
override def output: Seq[Attribute] = child.output
255+
256+
override def execute(): RDD[Row] = {
257+
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
258+
}
259+
}
260+
248261

249262
/**
250263
* :: DeveloperApi ::

0 commit comments

Comments
 (0)