From acfa46aad3140b7d10890b15f13519db684cd2b7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 10 May 2014 17:59:13 -0700 Subject: [PATCH 1/2] SPARK-1770: Load balance elements when repartitioning. This patch adds better balancing when performing a repartition of an RDD. Previously the elements in the RDD were hash partitioned, meaning if the RDD was skewed certain partitions would end up being very large. This commit adds load balancing of elements across the repartitioned RDD splits. The load balancing is not perfect: a given output partition can have up to N more elements than the average if there are N input partitions. However, some randomization is used to minimize the probabiliy that this happens. --- .../main/scala/org/apache/spark/rdd/RDD.scala | 13 ++++++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a1ca612cc9a09..1a8402ba360a0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag]( def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = { if (shuffle) { + /** Distributes elements evenly across output partitions, starting from a random partition. */ + def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = { + var position = (new Random(index)).nextInt(numPartitions) + items.map{ t => + position = position + 1 % numPartitions + (position, t) + } + } + // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( - new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)), + new ShuffledRDD[Int, T, (Int, T)](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), - numPartitions).keys + numPartitions).values } else { new CoalescedRDD(this, numPartitions) } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 8da9a0da700e0..ffb8810eb91e0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(repartitioned2.collect().toSet === (1 to 1000).toSet) } + test("repartitioned RDDs perform load balancing") { + // Coalesce partitions + val input = Array.fill(1000)(1) + val initialPartitions = 10 + val data = sc.parallelize(input, initialPartitions) + + val repartitioned1 = data.repartition(2) + assert(repartitioned1.partitions.size == 2) + val partitions1 = repartitioned1.glom().collect() + // some noise in balancing is allowed due to randomization + assert(math.abs(partitions1(0).length - 500) < initialPartitions) + assert(math.abs(partitions1(1).length - 500) < initialPartitions) + assert(repartitioned1.collect() === input) + + def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) { + val data = sc.parallelize(input, initialPartitions) + val repartitioned = data.repartition(finalPartitions) + assert(repartitioned.partitions.size == finalPartitions) + val partitions = repartitioned.glom().collect() + // assert all elements are present + assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq) + // assert no bucket is overloaded + for (partition <- partitions) { + val avg = input.size / finalPartitions + val maxPossible = avg + initialPartitions + assert(partition.length <= maxPossible) + } + } + + testSplitPartitions(Array.fill(100)(1), 10, 20) + testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100) + } + test("coalesced RDDs") { val data = sc.parallelize(1 to 10, 10) From f9da752e034d11f181500ae44212391493d8f46d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 11 May 2014 17:09:57 -0700 Subject: [PATCH 2/2] Response to Matei's feedback --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 ++++-- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1a8402ba360a0..aa03e9276fb34 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -331,8 +331,10 @@ abstract class RDD[T: ClassTag]( /** Distributes elements evenly across output partitions, starting from a random partition. */ def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = { var position = (new Random(index)).nextInt(numPartitions) - items.map{ t => - position = position + 1 % numPartitions + items.map { t => + // Note that the hash code of the key will just be the key itself. The HashPartitioner + // will mod it with the number of total partitions. + position = position + 1 (position, t) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ffb8810eb91e0..e686068f7a99a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -219,7 +219,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) { val data = sc.parallelize(input, initialPartitions) val repartitioned = data.repartition(finalPartitions) - assert(repartitioned.partitions.size == finalPartitions) + assert(repartitioned.partitions.size === finalPartitions) val partitions = repartitioned.glom().collect() // assert all elements are present assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)