Skip to content

Commit 7c23c0d

Browse files
aarondavpwendell
authored andcommitted
[SPARK-2412] CoalescedRDD throws exception with certain pref locs
If the first pass of CoalescedRDD does not find the target number of locations AND the second pass finds new locations, an exception is thrown, as "groupHash.get(nxt_replica).get" is not valid. The fix is just to add an ArrayBuffer to groupHash for that replica if it didn't already exist. Author: Aaron Davidson <[email protected]> Closes apache#1337 from aarondav/2412 and squashes the following commits: f587b5d [Aaron Davidson] getOrElseUpdate 3ad8a3c [Aaron Davidson] [SPARK-2412] CoalescedRDD throws exception with certain pref locs
1 parent 9c24974 commit 7c23c0d

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
258258
val pgroup = PartitionGroup(nxt_replica)
259259
groupArr += pgroup
260260
addPartToPGroup(nxt_part, pgroup)
261-
groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
261+
groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
262262
numCreated += 1
263263
}
264264
}
@@ -267,7 +267,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
267267
var (nxt_replica, nxt_part) = rotIt.next()
268268
val pgroup = PartitionGroup(nxt_replica)
269269
groupArr += pgroup
270-
groupHash.get(nxt_replica).get += pgroup
270+
groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
271271
var tries = 0
272272
while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
273273
nxt_part = rotIt.next()._2

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
351351
}
352352
}
353353

354+
// Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
355+
test("coalesced RDDs with locality, fail first pass") {
356+
val initialPartitions = 1000
357+
val targetLen = 50
358+
val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
359+
360+
val blocks = (1 to initialPartitions).map { i =>
361+
(i, List(if (i > couponCount) "m2" else "m1"))
362+
}
363+
val data = sc.makeRDD(blocks)
364+
val coalesced = data.coalesce(targetLen)
365+
assert(coalesced.partitions.length == targetLen)
366+
}
367+
354368
test("zipped RDDs") {
355369
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
356370
val zipped = nums.zip(nums.map(_ + 1.0))

0 commit comments

Comments
 (0)