Skip to content

Commit 832d698

Browse files
jiangxb1987mridulm
authored andcommitted
[SPARK-22465][FOLLOWUP] Update the number of partitions of default partitioner when defaultParallelism is set
## What changes were proposed in this pull request? #20002 purposed a way to safe check the default partitioner, however, if `spark.default.parallelism` is set, the defaultParallelism still could be smaller than the proper number of partitions for upstreams RDDs. This PR tries to extend the approach to address the condition when `spark.default.parallelism` is set. The requirements where the PR helps with are : - Max partitioner is not eligible since it is atleast an order smaller, and - User has explicitly set 'spark.default.parallelism', and - Value of 'spark.default.parallelism' is lower than max partitioner - Since max partitioner was discarded due to being at least an order smaller, default parallelism is worse - even though user specified. Under the rest cases, the changes should be no-op. ## How was this patch tested? Add corresponding test cases in `PairRDDFunctionsSuite` and `PartitioningSuite`. Author: Xingbo Jiang <[email protected]> Closes #20091 from jiangxb1987/partitioner. (cherry picked from commit 96cb60b) Signed-off-by: Mridul Muralidharan <[email protected]>
1 parent 7241556 commit 832d698

File tree

3 files changed

+108
-32
lines changed

3 files changed

+108
-32
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,19 @@ object Partitioner {
4343
/**
4444
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
4545
*
46-
* If any of the RDDs already has a partitioner, and the number of partitions of the
47-
* partitioner is either greater than or is less than and within a single order of
48-
* magnitude of the max number of upstream partitions, choose that one.
46+
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
47+
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
4948
*
50-
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
51-
* spark.default.parallelism is set, then we'll use the value from SparkContext
52-
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
49+
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
50+
* partitioner is eligible (number of partitions within an order of maximum number of partitions
51+
* in rdds), or has partition number higher than default partitions number - we use this
52+
* partitioner.
5353
*
54-
* Unless spark.default.parallelism is set, the number of partitions will be the
55-
* same as the number of partitions in the largest upstream RDD, as this should
56-
* be least likely to cause out-of-memory errors.
54+
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
55+
*
56+
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
57+
* number of partitions in the largest upstream RDD, as this should be least likely to cause
58+
* out-of-memory errors.
5759
*
5860
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
5961
*/
@@ -67,31 +69,32 @@ object Partitioner {
6769
None
6870
}
6971

70-
if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
72+
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
73+
rdd.context.defaultParallelism
74+
} else {
75+
rdds.map(_.partitions.length).max
76+
}
77+
78+
// If the existing max partitioner is an eligible one, or its partitions number is larger
79+
// than the default number of partitions, use the existing partitioner.
80+
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
81+
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
7182
hasMaxPartitioner.get.partitioner.get
7283
} else {
73-
if (rdd.context.conf.contains("spark.default.parallelism")) {
74-
new HashPartitioner(rdd.context.defaultParallelism)
75-
} else {
76-
new HashPartitioner(rdds.map(_.partitions.length).max)
77-
}
84+
new HashPartitioner(defaultNumPartitions)
7885
}
7986
}
8087

8188
/**
82-
* Returns true if the number of partitions of the RDD is either greater
83-
* than or is less than and within a single order of magnitude of the
84-
* max number of upstream partitions;
85-
* otherwise, returns false
89+
* Returns true if the number of partitions of the RDD is either greater than or is less than and
90+
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
91+
* false.
8692
*/
8793
private def isEligiblePartitioner(
88-
hasMaxPartitioner: Option[RDD[_]],
94+
hasMaxPartitioner: RDD[_],
8995
rdds: Seq[RDD[_]]): Boolean = {
90-
if (hasMaxPartitioner.isEmpty) {
91-
return false
92-
}
9396
val maxPartitions = rdds.map(_.partitions.length).max
94-
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
97+
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
9598
}
9699
}
97100

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,11 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
262262

263263
test("defaultPartitioner") {
264264
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
265-
val rdd2 = sc
266-
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
265+
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
267266
.partitionBy(new HashPartitioner(10))
268-
val rdd3 = sc
269-
.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
267+
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
270268
.partitionBy(new HashPartitioner(100))
271-
val rdd4 = sc
272-
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
269+
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
273270
.partitionBy(new HashPartitioner(9))
274271
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
275272

@@ -284,7 +281,42 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
284281
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
285282
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
286283
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
284+
}
287285

286+
test("defaultPartitioner when defaultParallelism is set") {
287+
assert(!sc.conf.contains("spark.default.parallelism"))
288+
try {
289+
sc.conf.set("spark.default.parallelism", "4")
290+
291+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
292+
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
293+
.partitionBy(new HashPartitioner(10))
294+
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
295+
.partitionBy(new HashPartitioner(100))
296+
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
297+
.partitionBy(new HashPartitioner(9))
298+
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
299+
val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
300+
.partitionBy(new HashPartitioner(3))
301+
302+
val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
303+
val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
304+
val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
305+
val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
306+
val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
307+
val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
308+
val partitioner7 = Partitioner.defaultPartitioner(rdd1, rdd6)
309+
310+
assert(partitioner1.numPartitions == rdd2.getNumPartitions)
311+
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
312+
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
313+
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
314+
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
315+
assert(partitioner6.numPartitions == sc.defaultParallelism)
316+
assert(partitioner7.numPartitions == sc.defaultParallelism)
317+
} finally {
318+
sc.conf.remove("spark.default.parallelism")
319+
}
288320
}
289321
}
290322

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
322322
}
323323

324324
// See SPARK-22465
325-
test("cogroup between multiple RDD" +
326-
" with number of partitions similar in order of magnitude") {
325+
test("cogroup between multiple RDD with number of partitions similar in order of magnitude") {
327326
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
328327
val rdd2 = sc
329328
.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
@@ -332,6 +331,48 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
332331
assert(joined.getNumPartitions == rdd2.getNumPartitions)
333332
}
334333

334+
test("cogroup between multiple RDD when defaultParallelism is set without proper partitioner") {
335+
assert(!sc.conf.contains("spark.default.parallelism"))
336+
try {
337+
sc.conf.set("spark.default.parallelism", "4")
338+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
339+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10)
340+
val joined = rdd1.cogroup(rdd2)
341+
assert(joined.getNumPartitions == sc.defaultParallelism)
342+
} finally {
343+
sc.conf.remove("spark.default.parallelism")
344+
}
345+
}
346+
347+
test("cogroup between multiple RDD when defaultParallelism is set with proper partitioner") {
348+
assert(!sc.conf.contains("spark.default.parallelism"))
349+
try {
350+
sc.conf.set("spark.default.parallelism", "4")
351+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20)
352+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
353+
.partitionBy(new HashPartitioner(10))
354+
val joined = rdd1.cogroup(rdd2)
355+
assert(joined.getNumPartitions == rdd2.getNumPartitions)
356+
} finally {
357+
sc.conf.remove("spark.default.parallelism")
358+
}
359+
}
360+
361+
test("cogroup between multiple RDD when defaultParallelism is set; with huge number of " +
362+
"partitions in upstream RDDs") {
363+
assert(!sc.conf.contains("spark.default.parallelism"))
364+
try {
365+
sc.conf.set("spark.default.parallelism", "4")
366+
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000)
367+
val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
368+
.partitionBy(new HashPartitioner(10))
369+
val joined = rdd1.cogroup(rdd2)
370+
assert(joined.getNumPartitions == rdd2.getNumPartitions)
371+
} finally {
372+
sc.conf.remove("spark.default.parallelism")
373+
}
374+
}
375+
335376
test("rightOuterJoin") {
336377
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
337378
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))

0 commit comments

Comments
 (0)