From dd91e08a92ebace863506cdfe52114ffeec894c9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 21 Aug 2014 21:56:27 -0700 Subject: [PATCH 1/4] add `comp` argument for RDD.max() and RDD.min() --- python/pyspark/rdd.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3eefc878d274e..6c52aaaef1599 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -810,23 +810,45 @@ def func(iterator): return self.mapPartitions(func).fold(zeroValue, combOp) - def max(self): + def max(self, comp=None): """ Find the maximum item in this RDD. - >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() + @param comp: A function used to compare two elements, the builtin `cmp` + will be used by default. + + >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) + >>> rdd.max() 43.0 + >>> rdd.max(lambda a, b: cmp(str(a), str(b))) + 5.0 """ - return self.reduce(max) + if comp is not None: + func = lambda a, b: a if comp(a, b) >= 0 else b + else: + func = max - def min(self): + return self.reduce(func) + + def min(self, comp=None): """ Find the minimum item in this RDD. - >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() - 1.0 + @param comp: A function used to compare two elements, the builtin `cmp` + will be used by default. + + >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) + >>> rdd.min() + 2.0 + >>> rdd.min(lambda a, b: cmp(str(a), str(b))) + 10.0 """ - return self.reduce(min) + if comp is not None: + func = lambda a, b: a if comp(a, b) <= 0 else b + else: + func = min + + return self.reduce(func) def sum(self): """ From 2f63512e10a608722c1e8cd9ab5d22124d389a5d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 23 Aug 2014 10:52:35 -0700 Subject: [PATCH 2/4] change `comp` to `key` in min/max --- python/pyspark/rdd.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6c52aaaef1599..faf84d2bab355 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -810,45 +810,33 @@ def func(iterator): return self.mapPartitions(func).fold(zeroValue, combOp) - def max(self, comp=None): + def max(self, key=None): """ Find the maximum item in this RDD. - @param comp: A function used to compare two elements, the builtin `cmp` - will be used by default. + @param key: A function used to generate key for comparing >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) >>> rdd.max() 43.0 - >>> rdd.max(lambda a, b: cmp(str(a), str(b))) + >>> rdd.max(key=str) 5.0 """ - if comp is not None: - func = lambda a, b: a if comp(a, b) >= 0 else b - else: - func = max - - return self.reduce(func) + return self.reduce(lambda a, b: max(a, b, key=key)) - def min(self, comp=None): + def min(self, key=None): """ Find the minimum item in this RDD. - @param comp: A function used to compare two elements, the builtin `cmp` - will be used by default. + @param key: A function used to generate key for comparing >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) >>> rdd.min() 2.0 - >>> rdd.min(lambda a, b: cmp(str(a), str(b))) + >>> rdd.min(key=str) 10.0 """ - if comp is not None: - func = lambda a, b: a if comp(a, b) <= 0 else b - else: - func = min - - return self.reduce(func) + return self.reduce(lambda a, b: min(a, b, key=key)) def sum(self): """ From ad7e374bd834d1e789ff95bba09f0c87ba67c4fd Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 23 Aug 2014 14:54:14 -0700 Subject: [PATCH 3/4] fix tests --- python/pyspark/rdd.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index faf84d2bab355..18261bf98f13a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -822,6 +822,8 @@ def max(self, key=None): >>> rdd.max(key=str) 5.0 """ + if key is None: + return self.reduce(max) return self.reduce(lambda a, b: max(a, b, key=key)) def min(self, key=None): @@ -836,6 +838,8 @@ def min(self, key=None): >>> rdd.min(key=str) 10.0 """ + if key is None: + return self.reduce(min) return self.reduce(lambda a, b: min(a, b, key=key)) def sum(self): From ccbaf25ce6d601bcbc7cb6081128c2b4236925ad Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 23 Aug 2014 15:00:37 -0700 Subject: [PATCH 4/4] add `key` to top() --- python/pyspark/rdd.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 18261bf98f13a..bdd8bc82869fb 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -938,7 +938,7 @@ def mergeMaps(m1, m2): return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) - def top(self, num): + def top(self, num, key=None): """ Get the top N elements from a RDD. @@ -947,20 +947,16 @@ def top(self, num): [12] >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) [6, 5] + >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str) + [4, 3, 2] """ def topIterator(iterator): - q = [] - for k in iterator: - if len(q) < num: - heapq.heappush(q, k) - else: - heapq.heappushpop(q, k) - yield q + yield heapq.nlargest(num, iterator, key=key) def merge(a, b): - return next(topIterator(a + b)) + return heapq.nlargest(num, a + b, key=key) - return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) + return self.mapPartitions(topIterator).reduce(merge) def takeOrdered(self, num, key=None): """