Skip to content
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
* altered after the call to parallelize and before the first action on the
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
* the argument to avoid this.
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
*
* @note When splitting Range, the sub Range is exclusive. However the last slice for inclusive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary for callers to know this? It seems like purely an implementation detail that could be documented inside the function later. Same for the scaladoc later.
I would say "the sub Ranges in each slice are exclusive"
Otherwise at a glance this looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment. I will update the docs if we decide to add them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I would rather leave this out. I'll delete this when I merge.

* Range is inclusive.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers.
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
Expand All @@ -127,19 +128,15 @@ private object ParallelCollectionRDD {
})
}
seq match {
case r: Range.Inclusive => {
val sign = if (r.step < 0) {
-1
} else {
1
}
slice(new Range(
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
}
case r: Range => {
positions(r.length, numSlices).map({
case (start, end) =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to match multiple cases here if we just ignore the index for the non-inclusive case. I think it's sufficient to do

positions(...).zipWithIndex.map { case ((start, end), index) =>
  // If the range is inclusive, include the last element in the last slice
  if (r.isInclusive && index == numSlices - 1) {
    new Range.Inclusive(r.start + start * r.step, r.end, r.step)
  } else {
    new Range(... as before ...)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No doubt that your version is more straightforward than mine. When I wrote my code, I didn't consider splitting normal inclusive range using inclusive range. However the benefit of my implementation is that the splitting result will be same as in the master for normal inclusive ranges. I wonder there may be some spark code rely on the exclusive range output. And of course, I think we should update the corresponding document for this kind of change.

I will covert the pattern matching to one case and update the implementation when we decided which one fits better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as we don't change the behavior it's preferrable to rewrite it in a readable manner. Here it's pretty clear to me that if the range is inclusive we should include the last element in the last slice, regardless of whether the range ends in a special value like Int.MaxValue.

}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
assert(slices(2).isInstanceOf[Range.Inclusive])
}

test("empty data") {
Expand Down Expand Up @@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}

test("inclusive ranges with Int.MaxValue and Int.MinValue") {
val data1 = 1 to Int.MaxValue
val slices1 = ParallelCollectionRDD.slice(data1, 3)
assert(slices1.size === 3)
assert(slices1.map(_.size).sum === Int.MaxValue)
assert(slices1(2).isInstanceOf[Range.Inclusive])
val data2 = -2 to Int.MinValue by -1
val slices2 = ParallelCollectionRDD.slice(data2, 3)
assert(slices2.size == 3)
assert(slices2.map(_.size).sum === Int.MaxValue)
assert(slices2(2).isInstanceOf[Range.Inclusive])
}

test("empty ranges with Int.MaxValue and Int.MinValue") {
val data1 = Int.MaxValue until Int.MaxValue
val slices1 = ParallelCollectionRDD.slice(data1, 5)
assert(slices1.size === 5)
for (i <- 0 until 5) assert(slices1(i).size === 0)
val data2 = Int.MaxValue until Int.MaxValue
val slices2 = ParallelCollectionRDD.slice(data2, 5)
assert(slices2.size === 5)
for (i <- 0 until 5) assert(slices2(i).size === 0)
}
}