Skip to content

Commit ccdc0e6

Browse files
committed
improve comments and tests.
1 parent 62088ca commit ccdc0e6

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ object Partitioner {
4646
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
4747
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
4848
*
49-
* If any of the RDDs already has a partitioner, and the partitioner is an eligible one (with a
50-
* partitions number that is not less than the max number of upstream partitions by an order of
51-
* magnitude), or the number of partitions is larger than the default one, we'll choose the
52-
* exsiting partitioner.
49+
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
50+
* partitioner is eligible (number of partitions within an order of maximum number of partitions
51+
* in rdds), or has partition number higher than default partitions number - we use this
52+
* partitioner.
5353
*
5454
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
5555
*

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,11 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
262262

263263
test("defaultPartitioner") {
264264
val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150)
265-
val rdd2 = sc
266-
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
265+
val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
267266
.partitionBy(new HashPartitioner(10))
268-
val rdd3 = sc
269-
.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
267+
val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14)))
270268
.partitionBy(new HashPartitioner(100))
271-
val rdd4 = sc
272-
.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
269+
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
273270
.partitionBy(new HashPartitioner(9))
274271
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
275272

@@ -299,20 +296,24 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
299296
val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
300297
.partitionBy(new HashPartitioner(9))
301298
val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11)
299+
val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4)))
300+
.partitionBy(new HashPartitioner(3))
302301

303302
val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2)
304303
val partitioner2 = Partitioner.defaultPartitioner(rdd2, rdd3)
305304
val partitioner3 = Partitioner.defaultPartitioner(rdd3, rdd1)
306305
val partitioner4 = Partitioner.defaultPartitioner(rdd1, rdd2, rdd3)
307306
val partitioner5 = Partitioner.defaultPartitioner(rdd4, rdd5)
308307
val partitioner6 = Partitioner.defaultPartitioner(rdd5, rdd5)
308+
val partitioner7 = Partitioner.defaultPartitioner(rdd1, rdd6)
309309

310310
assert(partitioner1.numPartitions == rdd2.getNumPartitions)
311311
assert(partitioner2.numPartitions == rdd3.getNumPartitions)
312312
assert(partitioner3.numPartitions == rdd3.getNumPartitions)
313313
assert(partitioner4.numPartitions == rdd3.getNumPartitions)
314314
assert(partitioner5.numPartitions == rdd4.getNumPartitions)
315315
assert(partitioner6.numPartitions == sc.defaultParallelism)
316+
assert(partitioner7.numPartitions == sc.defaultParallelism)
316317
} finally {
317318
sc.conf.remove("spark.default.parallelism")
318319
}

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
358358
}
359359
}
360360

361-
test("cogroup between multiple RDD when defaultParallelism is set with huge number of " +
362-
"partitions from upstream RDDs") {
361+
test("cogroup between multiple RDD when defaultParallelism is set; with huge number of " +
362+
"partitions in upstream RDDs") {
363363
assert(!sc.conf.contains("spark.default.parallelism"))
364364
try {
365365
sc.conf.set("spark.default.parallelism", "4")

0 commit comments

Comments
 (0)