@@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
117117 if (numSlices < 1 ) {
118118 throw new IllegalArgumentException (" Positive number of slices required" )
119119 }
120+ // Sequences need to be sliced at same positions for operations
121+ // like RDD.zip() to behave as expected
122+ def positions (length : Long , numSlices : Int ): Seq [(Int , Int )] = {
123+ (0 until numSlices).map(i => {
124+ val start = ((i * length) / numSlices).toInt
125+ val end = (((i + 1 ) * length) / numSlices).toInt
126+ (start, end)
127+ })
128+ }
120129 seq match {
121130 case r : Range .Inclusive => {
122131 val sign = if (r.step < 0 ) {
@@ -128,29 +137,27 @@ private object ParallelCollectionRDD {
128137 r.start, r.end + sign, r.step).asInstanceOf [Seq [T ]], numSlices)
129138 }
130139 case r : Range => {
131- (0 until numSlices).map(i => {
132- val start = ((i * r.length.toLong) / numSlices).toInt
133- val end = (((i + 1 ) * r.length.toLong) / numSlices).toInt
134- new Range (r.start + start * r.step, r.start + end * r.step, r.step)
140+ positions(r.length, numSlices).map({
141+ case (start, end) =>
142+ new Range (r.start + start * r.step, r.start + end * r.step, r.step)
135143 }).asInstanceOf [Seq [Seq [T ]]]
136144 }
137145 case nr : NumericRange [_] => {
138146 // For ranges of Long, Double, BigInteger, etc
139147 val slices = new ArrayBuffer [Seq [T ]](numSlices)
140- val sliceSize = (nr.size + numSlices - 1 ) / numSlices // Round up to catch everything
141148 var r = nr
142- for (i <- 0 until numSlices) {
149+ for ((start, end) <- positions(nr.length, numSlices)) {
150+ val sliceSize = end - start
143151 slices += r.take(sliceSize).asInstanceOf [Seq [T ]]
144152 r = r.drop(sliceSize)
145153 }
146154 slices
147155 }
148156 case _ => {
149157 val array = seq.toArray // To prevent O(n^2) operations for List etc
150- (0 until numSlices).map(i => {
151- val start = ((i * array.length.toLong) / numSlices).toInt
152- val end = (((i + 1 ) * array.length.toLong) / numSlices).toInt
153- array.slice(start, end).toSeq
158+ positions(array.length, numSlices).map({
159+ case (start, end) =>
160+ array.slice(start, end).toSeq
154161 })
155162 }
156163 }
0 commit comments