diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3235f85d5bbd..17c21f6e3a0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -908,6 +908,20 @@ class DataFrame private[sql]( schema, needsConversion = false) } + /** + * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @group rdd + */ + override def coalesce(numPartitions: Int): DataFrame = { + sqlContext.createDataFrame( + queryExecution.toRdd.coalesce(numPartitions), + schema, + needsConversion = false) + } + /** * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. * @group dfops diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala index ba4373f0124b..63dbab19947c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] { def repartition(numPartitions: Int): DataFrame + def coalesce(numPartitions: Int): DataFrame + def distinct: DataFrame } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b26e22f6229f..582dc8f023a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -172,6 +172,14 @@ class DataFrameSuite extends QueryTest { testData.select('key).collect().toSeq) } + test("coalesce") { + assert(testData.select('key).coalesce(1).rdd.partitions.size === 1) + + checkAnswer( + testData.select('key).coalesce(1).select('key), + testData.select('key).collect().toSeq) + } + test("groupBy") { checkAnswer( testData2.groupBy("a").agg($"a", sum($"b")),