@@ -274,37 +274,42 @@ class RDDSuite extends FunSuite with SharedSparkContext {
274274 test(" coalesced RDDs with locality, large scale (10K partitions)" ) {
275275 // large scale experiment
276276 import collection .mutable
277- val rnd = scala.util.Random
278277 val partitions = 10000
279278 val numMachines = 50
280279 val machines = mutable.ListBuffer [String ]()
281- (1 to numMachines).foreach(machines += " m" + _)
282-
283- val blocks = (1 to partitions).map(i =>
284- { (i, Array .fill(3 )(machines(rnd.nextInt(machines.size))).toList) } )
285-
286- val data2 = sc.makeRDD(blocks)
287- val coalesced2 = data2.coalesce(numMachines* 2 )
288-
289- // test that you get over 90% locality in each group
290- val minLocality = coalesced2.partitions
291- .map(part => part.asInstanceOf [CoalescedRDDPartition ].localFraction)
292- .foldLeft(1.0 )((perc, loc) => math.min(perc,loc))
293- assert(minLocality >= 0.90 , " Expected 90% locality but got " + (minLocality* 100.0 ).toInt + " %" )
294-
295- // test that the groups are load balanced with 100 +/- 20 elements in each
296- val maxImbalance = coalesced2.partitions
297- .map(part => part.asInstanceOf [CoalescedRDDPartition ].parents.size)
298- .foldLeft(0 )((dev, curr) => math.max(math.abs(100 - curr),dev))
299- assert(maxImbalance <= 20 , " Expected 100 +/- 20 per partition, but got " + maxImbalance)
300-
301- val data3 = sc.makeRDD(blocks).map(i => i* 2 ) // derived RDD to test *current* pref locs
302- val coalesced3 = data3.coalesce(numMachines* 2 )
303- val minLocality2 = coalesced3.partitions
304- .map(part => part.asInstanceOf [CoalescedRDDPartition ].localFraction)
305- .foldLeft(1.0 )((perc, loc) => math.min(perc,loc))
306- assert(minLocality2 >= 0.90 , " Expected 90% locality for derived RDD but got " +
307- (minLocality2* 100.0 ).toInt + " %" )
280+ (1 to numMachines).foreach(machines += " m" + _)
281+ val rnd = scala.util.Random
282+ for (seed <- 1 to 5 ) {
283+ rnd.setSeed(seed)
284+
285+ val blocks = (1 to partitions).map { i =>
286+ (i, Array .fill(3 )(machines(rnd.nextInt(machines.size))).toList)
287+ }
288+
289+ val data2 = sc.makeRDD(blocks)
290+ val coalesced2 = data2.coalesce(numMachines * 2 )
291+
292+ // test that you get over 90% locality in each group
293+ val minLocality = coalesced2.partitions
294+ .map(part => part.asInstanceOf [CoalescedRDDPartition ].localFraction)
295+ .foldLeft(1.0 )((perc, loc) => math.min(perc, loc))
296+ assert(minLocality >= 0.90 , " Expected 90% locality but got " +
297+ (minLocality * 100.0 ).toInt + " %" )
298+
299+ // test that the groups are load balanced with 100 +/- 20 elements in each
300+ val maxImbalance = coalesced2.partitions
301+ .map(part => part.asInstanceOf [CoalescedRDDPartition ].parents.size)
302+ .foldLeft(0 )((dev, curr) => math.max(math.abs(100 - curr), dev))
303+ assert(maxImbalance <= 20 , " Expected 100 +/- 20 per partition, but got " + maxImbalance)
304+
305+ val data3 = sc.makeRDD(blocks).map(i => i * 2 ) // derived RDD to test *current* pref locs
306+ val coalesced3 = data3.coalesce(numMachines * 2 )
307+ val minLocality2 = coalesced3.partitions
308+ .map(part => part.asInstanceOf [CoalescedRDDPartition ].localFraction)
309+ .foldLeft(1.0 )((perc, loc) => math.min(perc, loc))
310+ assert(minLocality2 >= 0.90 , " Expected 90% locality for derived RDD but got " +
311+ (minLocality2 * 100.0 ).toInt + " %" )
312+ }
308313 }
309314
310315 test(" zipped RDDs" ) {
0 commit comments