Skip to content

Commit 0115516

Browse files
darabossrowen
authored andcommitted
[SPARK-8893] Add runtime checks against non-positive number of partitions
https://issues.apache.org/jira/browse/SPARK-8893 > What does `sc.parallelize(1 to 3).repartition(p).collect` return? I would expect `Array(1, 2, 3)` regardless of `p`. But if `p` < 1, it returns `Array()`. I think instead it should throw an `IllegalArgumentException`. > I think the case is pretty clear for `p` < 0. But the behavior for `p` = 0 is also error prone. In fact that's how I found this strange behavior. I used `rdd.repartition(a/b)` with positive `a` and `b`, but `a/b` was rounded down to zero and the results surprised me. I'd prefer an exception instead of unexpected (corrupt) results. Author: Daniel Darabos <[email protected]> Closes apache#7285 from darabos/patch-1 and squashes the following commits: decba82 [Daniel Darabos] Allow repartitioning empty RDDs to zero partitions. 97de852 [Daniel Darabos] Allow zero partition count in HashPartitioner f6ba5fb [Daniel Darabos] Use require() for simpler syntax. d5e3df8 [Daniel Darabos] Require positive number of partitions in HashPartitioner 897c628 [Daniel Darabos] Require positive maxPartitions in CoalescedRDD
1 parent 0a79533 commit 0115516

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ object Partitioner {
7676
* produce an unexpected or incorrect result.
7777
*/
7878
class HashPartitioner(partitions: Int) extends Partitioner {
79+
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
80+
7981
def numPartitions: Int = partitions
8082

8183
def getPartition(key: Any): Int = key match {

core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private[spark] case class CoalescedRDDPartition(
6969
* the preferred location of each new partition overlaps with as many preferred locations of its
7070
* parent partitions
7171
* @param prev RDD to be coalesced
72-
* @param maxPartitions number of desired partitions in the coalesced RDD
72+
* @param maxPartitions number of desired partitions in the coalesced RDD (must be positive)
7373
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
7474
*/
7575
private[spark] class CoalescedRDD[T: ClassTag](
@@ -78,6 +78,9 @@ private[spark] class CoalescedRDD[T: ClassTag](
7878
balanceSlack: Double = 0.10)
7979
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
8080

81+
require(maxPartitions > 0 || maxPartitions == prev.partitions.length,
82+
s"Number of partitions ($maxPartitions) must be positive.")
83+
8184
override def getPartitions: Array[Partition] = {
8285
val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
8386

0 commit comments

Comments
 (0)