File tree Expand file tree Collapse file tree 3 files changed +24
-0
lines changed
main/scala/org/apache/spark/sql
test/scala/org/apache/spark/sql Expand file tree Collapse file tree 3 files changed +24
-0
lines changed Original file line number Diff line number Diff line change @@ -908,6 +908,20 @@ class DataFrame private[sql](
908908 schema, needsConversion = false )
909909 }
910910
911+ /**
912+ * Returns a new [[DataFrame ]] that has exactly `numPartitions` partitions.
913+ * Similar to coalesce defined on an [[RDD ]], this operation results in a narrow dependency, e.g.
914+ * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
915+ * the 100 new partitions will claim 10 of the current partitions.
916+ * @group rdd
917+ */
918+ override def coalesce (numPartitions : Int ): DataFrame = {
919+ sqlContext.createDataFrame(
920+ queryExecution.toRdd.coalesce(numPartitions),
921+ schema,
922+ needsConversion = false )
923+ }
924+
911925 /**
912926 * Returns a new [[DataFrame ]] that contains only the unique rows from this [[DataFrame ]].
913927 * @group dfops
Original file line number Diff line number Diff line change @@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] {
6161
6262 def repartition (numPartitions : Int ): DataFrame
6363
64+ def coalesce (numPartitions : Int ): DataFrame
65+
6466 def distinct : DataFrame
6567}
Original file line number Diff line number Diff line change @@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest {
178178 testData.select(' key ).collect().toSeq)
179179 }
180180
181+ test(" coalesce" ) {
182+ assert(testData.select(' key ).coalesce(1 ).rdd.partitions.size === 1 )
183+
184+ checkAnswer(
185+ testData.select(' key ).coalesce(1 ).select(' key ),
186+ testData.select(' key ).collect().toSeq)
187+ }
188+
181189 test(" groupBy" ) {
182190 checkAnswer(
183191 testData2.groupBy(" a" ).agg($" a" , sum($" b" )),
You can’t perform that action at this time.
0 commit comments