From 0d2a128da7f24214e6714a93f7402aaea93075f2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 21 Aug 2014 21:42:00 -0700 Subject: [PATCH 1/2] add zipWithIndex() and zipWithUniqueId() --- python/pyspark/rdd.py | 46 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3eefc878d274e..3449eaa599e84 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1715,6 +1715,52 @@ def batch_as(rdd, batchSize): other._jrdd_deserializer) return RDD(pairRDD, self.ctx, deserializer) + def zipWithIndex(self): + """ + Zips this RDD with its element indices. + + The ordering is first based on the partition index and then the + ordering of items within each partition. So the first item in + the first partition gets index 0, and the last item in the last + partition receives the largest index. + + This method needs to trigger a spark job when this RDD contains + more than one partitions. + + >>> sc.parallelize(range(4), 2).zipWithIndex().collect() + [(0, 0), (1, 1), (2, 2), (3, 3)] + """ + starts = [0] + if self.getNumPartitions() > 1: + nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() + for i in range(len(nums) - 1): + starts.append(starts[-1] + nums[i]) + + def func(k, it): + return enumerate(it, starts[k]) + + return self.mapPartitionsWithIndex(func) + + def zipWithUniqueId(self): + """ + Zips this RDD with generated unique Long ids. + + Items in the kth partition will get ids k, n+k, 2*n+k, ..., where + n is the number of partitions. So there may exist gaps, but this + method won't trigger a spark job, which is different from + L{zipWithIndex} + + >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect() + [(0, 0), (2, 1), (1, 2), (3, 3)] + """ + n = self.getNumPartitions() + + def func(k, it): + for i, v in enumerate(it): + yield i * n + k, v + + return self.mapPartitionsWithIndex(func) + def name(self): """ Return the name of this RDD. From cebe5bfe263baf3349353f1473f097396821514a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 23 Aug 2014 19:41:16 -0700 Subject: [PATCH 2/2] improve test cases, reverse the order of index --- python/pyspark/rdd.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3449eaa599e84..f0706d846d014 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1727,8 +1727,8 @@ def zipWithIndex(self): This method needs to trigger a spark job when this RDD contains more than one partitions. - >>> sc.parallelize(range(4), 2).zipWithIndex().collect() - [(0, 0), (1, 1), (2, 2), (3, 3)] + >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() + [('a', 0), ('b', 1), ('c', 2), ('d', 3)] """ starts = [0] if self.getNumPartitions() > 1: @@ -1737,7 +1737,8 @@ def zipWithIndex(self): starts.append(starts[-1] + nums[i]) def func(k, it): - return enumerate(it, starts[k]) + for i, v in enumerate(it, starts[k]): + yield v, i return self.mapPartitionsWithIndex(func) @@ -1750,14 +1751,14 @@ def zipWithUniqueId(self): method won't trigger a spark job, which is different from L{zipWithIndex} - >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect() - [(0, 0), (2, 1), (1, 2), (3, 3)] + >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() + [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] """ n = self.getNumPartitions() def func(k, it): for i, v in enumerate(it): - yield i * n + k, v + yield v, i * n + k return self.mapPartitionsWithIndex(func)