Skip to content

Commit 62088ca

Browse files
committed
deal with partitioner when defaultParallelism is explict set.
1 parent ff48b1b commit 62088ca

File tree

3 files changed

+101
-26
lines changed

3 files changed

+101
-26
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,19 @@ object Partitioner {
4343
/**
4444
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
4545
*
46-
* If any of the RDDs already has a partitioner, and the number of partitions of the
47-
* partitioner is either greater than or is less than and within a single order of
48-
* magnitude of the max number of upstream partitions, choose that one.
46+
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
47+
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
4948
*
50-
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
51-
* spark.default.parallelism is set, then we'll use the value from SparkContext
52-
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
49+
* If any of the RDDs already has a partitioner, and the partitioner is an eligible one (with a
50+
* partitions number that is not less than the max number of upstream partitions by an order of
51+
* magnitude), or the number of partitions is larger than the default one, we'll choose the
52+
* exsiting partitioner.
5353
*
54-
* Unless spark.default.parallelism is set, the number of partitions will be the
55-
* same as the number of partitions in the largest upstream RDD, as this should
56-
* be least likely to cause out-of-memory errors.
54+
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
55+
*
56+
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
57+
* number of partitions in the largest upstream RDD, as this should be least likely to cause
58+
* out-of-memory errors.
5759
*
5860
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
5961
*/
@@ -67,31 +69,32 @@ object Partitioner {
6769
None
6870
}
6971

70-
if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
72+
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
73+
rdd.context.defaultParallelism
74+
} else {
75+
rdds.map(_.partitions.length).max
76+
}
77+
78+
// If the existing max partitioner is an eligible one, or its partitions number is larger
79+
// than the default number of partitions, use the existing partitioner.
80+
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
81+
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
7182
hasMaxPartitioner.get.partitioner.get
7283
} else {
73-
if (rdd.context.conf.contains("spark.default.parallelism")) {
74-
new HashPartitioner(rdd.context.defaultParallelism)
75-
} else {
76-
new HashPartitioner(rdds.map(_.partitions.length).max)
77-
}
84+
new HashPartitioner(defaultNumPartitions)
7885
}
7986
}
8087

8188
/**
82-
* Returns true if the number of partitions of the RDD is either greater
83-
* than or is less than and within a single order of magnitude of the
84-
* max number of upstream partitions;
85-
* otherwise, returns false
89+
* Returns true if the number of partitions of the RDD is either greater than or is less than and
90+
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
91+
* false.
8692
*/
8793
private def isEligiblePartitioner(
88-
hasMaxPartitioner: Option[RDD[_]],
94+
hasMaxPartitioner: RDD[_],
8995
rdds: Seq[RDD[_]]): Boolean = {
90-
if (hasMaxPartitioner.isEmpty) {
91-
return false
92-
}
9396
val maxPartitions = rdds.map(_.partitions.length).max
94-
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
97+
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
9598
}
9699
}
97100

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,38 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
284284
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
285285
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
286286
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
287+
}
287288

289+
test("defaultPartitioner when defaultParallelism is set") {
290+
assert(!sc.conf.contains("spark.default.parallelism"))
291+
try {
292+
sc.conf.set("spark.default.parallelism", "4")
293+
294+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
295+
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
296+
.partitionBy(new HashPartitioner(10))
297+
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
298+
.partitionBy(new HashPartitioner(100))
299+
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
300+
.partitionBy(new HashPartitioner(9))
301+
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
302+
303+
val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
304+
val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
305+
val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
306+
val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
307+
val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
308+
val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
309+
310+
assert(partitioner1.numPartitions == rdd2.getNumPartitions)
311+
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
312+
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
313+
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
314+
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
315+
assert(partitioner6.numPartitions == sc.defaultParallelism)
316+
} finally {
317+
sc.conf.remove("spark.default.parallelism")
318+
}
288319
}
289320
}
290321

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
322322
}
323323

324324
// See SPARK-22465
325-
test("cogroup between multiple RDD" +
326-
" with number of partitions similar in order of magnitude") {
325+
test("cogroup between multiple RDD with number of partitions similar in order of magnitude") {
327326
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
328327
val rdd2 = sc
329328
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
@@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
332331
assert(joined.getNumPartitions == rdd2.getNumPartitions)
333332
}
334333

334+
test("cogroup between multiple RDD when defaultParallelism is set without proper partitioner") {
335+
assert(!sc.conf.contains("spark.default.parallelism"))
336+
try {
337+
sc.conf.set("spark.default.parallelism", "4")
338+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
339+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
340+
val joined = rdd1.cogroup(rdd2)
341+
assert(joined.getNumPartitions == sc.defaultParallelism)
342+
} finally {
343+
sc.conf.remove("spark.default.parallelism")
344+
}
345+
}
346+
347+
test("cogroup between multiple RDD when defaultParallelism is set with proper partitioner") {
348+
assert(!sc.conf.contains("spark.default.parallelism"))
349+
try {
350+
sc.conf.set("spark.default.parallelism", "4")
351+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
352+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
353+
.partitionBy(new HashPartitioner(10))
354+
val joined = rdd1.cogroup(rdd2)
355+
assert(joined.getNumPartitions == rdd2.getNumPartitions)
356+
} finally {
357+
sc.conf.remove("spark.default.parallelism")
358+
}
359+
}
360+
361+
test("cogroup between multiple RDD when defaultParallelism is set with huge number of " +
362+
"partitions from upstream RDDs") {
363+
assert(!sc.conf.contains("spark.default.parallelism"))
364+
try {
365+
sc.conf.set("spark.default.parallelism", "4")
366+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
367+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
368+
.partitionBy(new HashPartitioner(10))
369+
val joined = rdd1.cogroup(rdd2)
370+
assert(joined.getNumPartitions == rdd2.getNumPartitions)
371+
} finally {
372+
sc.conf.remove("spark.default.parallelism")
373+
}
374+
}
375+
335376
test("rightOuterJoin") {
336377
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
337378
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

0 commit comments

Comments
 (0)