@@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize):
17151715 other ._jrdd_deserializer )
17161716 return RDD (pairRDD , self .ctx , deserializer )
17171717
1718+ def zipWithIndex (self ):
1719+ """
1720+ Zips this RDD with its element indices.
1721+
1722+ The ordering is first based on the partition index and then the
1723+ ordering of items within each partition. So the first item in
1724+ the first partition gets index 0, and the last item in the last
1725+ partition receives the largest index.
1726+
1727+ This method needs to trigger a spark job when this RDD contains
1728+ more than one partitions.
1729+
1730+ >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
1731+ [(0, 0), (1, 1), (2, 2), (3, 3)]
1732+ """
1733+ starts = [0 ]
1734+ if self .getNumPartitions () > 1 :
1735+ nums = self .mapPartitions (lambda it : [sum (1 for i in it )]).collect ()
1736+ for i in range (len (nums ) - 1 ):
1737+ starts .append (starts [- 1 ] + nums [i ])
1738+
1739+ def func (k , it ):
1740+ return enumerate (it , starts [k ])
1741+
1742+ return self .mapPartitionsWithIndex (func )
1743+
1744+ def zipWithUniqueId (self ):
1745+ """
1746+ Zips this RDD with generated unique Long ids.
1747+
1748+ Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
1749+ n is the number of partitions. So there may exist gaps, but this
1750+ method won't trigger a spark job, which is different from
1751+ L{zipWithIndex}
1752+
1753+ >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
1754+ [(0, 0), (2, 1), (1, 2), (3, 3)]
1755+ """
1756+ n = self .getNumPartitions ()
1757+
1758+ def func (k , it ):
1759+ for i , v in enumerate (it ):
1760+ yield i * n + k , v
1761+
1762+ return self .mapPartitionsWithIndex (func )
1763+
17181764 def name (self ):
17191765 """
17201766 Return the name of this RDD.
0 commit comments