Skip to content

Commit a9f66fb

Browse files
WeichenXu123Robert Kruszewski
authored andcommitted
[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index value overflowing
## What changes were proposed in this pull request? - Fix bug of RDD `zipWithIndex` generating wrong result when one partition contains more than 2147483647 records. - Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition contains more than 2147483647 records. ## How was this patch tested? test added. Author: WeichenXu <[email protected]> Closes apache#15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.
1 parent d867a3c commit a9f66fb

File tree

4 files changed

+25
-4
lines changed

4 files changed

+25
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
12781278
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
12791279
val n = this.partitions.length.toLong
12801280
this.mapPartitionsWithIndex { case (k, iter) =>
1281-
iter.zipWithIndex.map { case (item, i) =>
1281+
Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
12821282
(item, i * n + k)
12831283
}
12841284
}

core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
6464

6565
override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
6666
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
67-
firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
68-
(x._1, split.startIndex + x._2)
69-
}
67+
val parentIter = firstParent[T].iterator(split.prev, context)
68+
Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
7069
}
7170
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1759,6 +1759,21 @@ private[spark] object Utils extends Logging {
17591759
count
17601760
}
17611761

1762+
/**
1763+
* Generate a zipWithIndex iterator, avoid index value overflowing problem
1764+
* in scala's zipWithIndex
1765+
*/
1766+
def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
1767+
new Iterator[(T, Long)] {
1768+
var index: Long = startIndex - 1L
1769+
def hasNext: Boolean = iterator.hasNext
1770+
def next(): (T, Long) = {
1771+
index += 1L
1772+
(iterator.next(), index)
1773+
}
1774+
}
1775+
}
1776+
17621777
/**
17631778
* Creates a symlink.
17641779
*

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
396396
assert(Utils.getIteratorSize(iterator) === 5L)
397397
}
398398

399+
test("getIteratorZipWithIndex") {
400+
val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue)
401+
assert(iterator.toArray === Array(
402+
(0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
403+
))
404+
}
405+
399406
test("doesDirectoryContainFilesNewerThan") {
400407
// create some temporary directories and files
401408
val parent: File = Utils.createTempDir()

0 commit comments

Comments
 (0)