Skip to content

Commit e38cb29

Browse files
advancedxyAndrew Or
authored andcommitted
[SPARK-5201][CORE] deal with int overflow in the ParallelCollectionRDD.slice method
There is an int overflow in the ParallelCollectionRDD.slice method. That's originally reported by SaintBacchus. ``` sc.makeRDD(1 to (Int.MaxValue)).count // result = 0 sc.makeRDD(1 to (Int.MaxValue - 1)).count // result = 2147483646 = Int.MaxValue - 1 sc.makeRDD(1 until (Int.MaxValue)).count // result = 2147483646 = Int.MaxValue - 1 ``` see #2874 for more details. This pr try to fix the overflow. However, There's another issue I don't address. ``` val largeRange = Int.MinValue to Int.MaxValue largeRange.length // throws java.lang.IllegalArgumentException: -2147483648 to 2147483647 by 1: seqs cannot contain more than Int.MaxValue elements. ``` So, the range we feed to sc.makeRDD cannot contain more than Int.MaxValue elements. This is the limitation of Scala. However I think we may want to support that kind of range. But the fix is beyond this pr. srowen andrewor14 would you mind take a look at this pr? Author: Ye Xianjin <[email protected]> Closes #4002 from advancedxy/SPARk-5201 and squashes the following commits: 96265a1 [Ye Xianjin] Update slice method comment and some responding docs. e143d7a [Ye Xianjin] Update inclusive range check for splitting inclusive range. b3f5577 [Ye Xianjin] We can include the last element in the last slice in general for inclusive range, hence eliminate the need to check Int.MaxValue or Int.MinValue. 7d39b9e [Ye Xianjin] Convert the two cases pattern matching to one case. 651c959 [Ye Xianjin] rename sign to needsInclusiveRange. add some comments 196f8a8 [Ye Xianjin] Add test cases for ranges end with Int.MaxValue or Int.MinValue e66e60a [Ye Xianjin] Deal with inclusive and exclusive ranges in one case. If the range is inclusive and the end of the range is (Int.MaxValue or Int.MinValue), we should use inclusive range instead of exclusive
1 parent 89a0990 commit e38cb29

File tree

3 files changed

+37
-16
lines changed

3 files changed

+37
-16
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,10 +514,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
514514

515515
/** Distribute a local Scala collection to form an RDD.
516516
*
517-
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
518-
* altered after the call to parallelize and before the first action on the
519-
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
520-
* the argument to avoid this.
517+
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
518+
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
519+
* modified collection. Pass a copy of the argument to avoid this.
521520
*/
522521
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
523522
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())

core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ private object ParallelCollectionRDD {
111111
/**
112112
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
113113
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
114-
* it efficient to run Spark over RDDs representing large sets of numbers.
114+
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
115+
* is an inclusive Range, we use inclusive range for the last slice.
115116
*/
116117
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
117118
if (numSlices < 1) {
@@ -127,19 +128,15 @@ private object ParallelCollectionRDD {
127128
})
128129
}
129130
seq match {
130-
case r: Range.Inclusive => {
131-
val sign = if (r.step < 0) {
132-
-1
133-
} else {
134-
1
135-
}
136-
slice(new Range(
137-
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
138-
}
139131
case r: Range => {
140-
positions(r.length, numSlices).map({
141-
case (start, end) =>
132+
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
133+
// If the range is inclusive, use inclusive range for the last slice
134+
if (r.isInclusive && index == numSlices - 1) {
135+
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
136+
}
137+
else {
142138
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
139+
}
143140
}).toSeq.asInstanceOf[Seq[Seq[T]]]
144141
}
145142
case nr: NumericRange[_] => {

core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
7676
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
7777
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
7878
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
79+
assert(slices(2).isInstanceOf[Range.Inclusive])
7980
}
8081

8182
test("empty data") {
@@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
227228
assert(slices.map(_.size).reduceLeft(_+_) === 100)
228229
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
229230
}
231+
232+
test("inclusive ranges with Int.MaxValue and Int.MinValue") {
233+
val data1 = 1 to Int.MaxValue
234+
val slices1 = ParallelCollectionRDD.slice(data1, 3)
235+
assert(slices1.size === 3)
236+
assert(slices1.map(_.size).sum === Int.MaxValue)
237+
assert(slices1(2).isInstanceOf[Range.Inclusive])
238+
val data2 = -2 to Int.MinValue by -1
239+
val slices2 = ParallelCollectionRDD.slice(data2, 3)
240+
assert(slices2.size == 3)
241+
assert(slices2.map(_.size).sum === Int.MaxValue)
242+
assert(slices2(2).isInstanceOf[Range.Inclusive])
243+
}
244+
245+
test("empty ranges with Int.MaxValue and Int.MinValue") {
246+
val data1 = Int.MaxValue until Int.MaxValue
247+
val slices1 = ParallelCollectionRDD.slice(data1, 5)
248+
assert(slices1.size === 5)
249+
for (i <- 0 until 5) assert(slices1(i).size === 0)
250+
val data2 = Int.MaxValue until Int.MaxValue
251+
val slices2 = ParallelCollectionRDD.slice(data2, 5)
252+
assert(slices2.size === 5)
253+
for (i <- 0 until 5) assert(slices2(i).size === 0)
254+
}
230255
}

0 commit comments

Comments
 (0)