From 49e6ba704e532396b40a328698e6146f52e366b1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 14 Mar 2014 11:20:13 +0530 Subject: [PATCH 1/4] SPARK-1162 added takeOrdered to pyspark --- python/pyspark/rdd.py | 93 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 019c249699c2d..966e7e536fdaa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -29,7 +29,8 @@ from tempfile import NamedTemporaryFile from threading import Thread import warnings -from heapq import heappush, heappop, heappushpop +import heapq +import bisect from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long @@ -41,9 +42,9 @@ from py4j.java_collections import ListConverter, MapConverter - __all__ = ["RDD"] + def _extract_concise_traceback(): """ This function returns the traceback info for a callsite, returns a dict @@ -91,6 +92,58 @@ def __exit__(self, type, value, tb): if _spark_stack_depth == 0: self._context._jsc.setCallSite(None) +class MaxHeapQ(object): + """ + An implementation of MaxHeap. + + """ + + def __init__(self): + # we start from q[1], this makes calculating children as trivial as 2 * k + self.q = [0] + + def _swim(self, k): + while (k > 1) and (self.q[k/2] < self.q[k]): + self._swap(k, k/2) + k = k/2 + + def _swap(self, i, j): + t = self.q[i] + self.q[i] = self.q[j] + self.q[j] = t + + def _sink(self, k): + N=len(self.q)-1 + while 2*k <= N: + j = 2*k + # Here we test if both children are greater than parent + # if not swap with larger one. + if j self.q[j]): + break + self._swap(k, j) + k = j + + def insert(self, value): + self.q.append(value) + self._swim(len(self.q) - 1) + + def getQ(self): + return self.q[1:] + + def replaceRoot(self, value): + if(self.q[1] > value): + self.q[1] = value + self._sink(1) + + def delMax(self): + r = self.q[1] + self.q[1] = self.q[len(self.q) - 1] + self.q.pop() + self._sink(1) + return r + class RDD(object): """ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. @@ -696,16 +749,16 @@ def top(self, num): Note: It returns the list sorted in descending order. >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) [12] - >>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2) - [6, 5] + >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) + [5, 6] """ def topIterator(iterator): q = [] for k in iterator: if len(q) < num: - heappush(q, k) + heapq.heappush(q, k) else: - heappushpop(q, k) + heapq.heappushpop(q, k) yield q def merge(a, b): @@ -713,6 +766,34 @@ def merge(a, b): return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) + def takeOrdered(self, num, key=None): + """ + Get the N elements from a RDD ordered in ascending order or as specified + by the optional key function. + + >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) + [1, 2, 3, 4, 5, 6] + >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) + [(-10, 10), (-9, 9), (-7, 7), (-6, 6), (-5, 5), (-4, 4)] + """ + + def topNKeyedElems(iterator, key_=None): + q = MaxHeapQ() + for k in iterator: + if not (key_ == None): + k = (key_(k), k) + if (len(q.q) -1) < num: + q.insert(k) + else: + q.replaceRoot(k) + yield q.getQ() + + def merge(a, b): + return next(topNKeyedElems(a + b)) + + return sorted(self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)) + + def take(self, num): """ Take the first num elements of the RDD. From e8a08e2e5991ed168ae58f1fc8f7eaaf5855a6bc Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 2 Apr 2014 15:06:29 +0530 Subject: [PATCH 2/4] Code review comments. --- python/pyspark/rdd.py | 69 +++++++++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 966e7e536fdaa..1d1ac5d0b520a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -30,7 +30,6 @@ from threading import Thread import warnings import heapq -import bisect from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long @@ -95,55 +94,70 @@ def __exit__(self, type, value, tb): class MaxHeapQ(object): """ An implementation of MaxHeap. - + >>> import pyspark.rdd + >>> heap = pyspark.rdd.MaxHeapQ(5) + >>> [heap.insert(i) for i in range(10)] + [None, None, None, None, None, None, None, None, None, None] + >>> sorted(heap.getElements()) + [0, 1, 2, 3, 4] + >>> heap = pyspark.rdd.MaxHeapQ(5) + >>> [heap.insert(i) for i in range(9, -1, -1)] + [None, None, None, None, None, None, None, None, None, None] + >>> sorted(heap.getElements()) + [0, 1, 2, 3, 4] + >>> heap = pyspark.rdd.MaxHeapQ(1) + >>> [heap.insert(i) for i in range(9, -1, -1)] + [None, None, None, None, None, None, None, None, None, None] + >>> heap.getElements() + [0] """ - - def __init__(self): + + def __init__(self, maxsize): # we start from q[1], this makes calculating children as trivial as 2 * k self.q = [0] - + self.maxsize = maxsize + def _swim(self, k): while (k > 1) and (self.q[k/2] < self.q[k]): self._swap(k, k/2) k = k/2 - + def _swap(self, i, j): t = self.q[i] self.q[i] = self.q[j] self.q[j] = t def _sink(self, k): - N=len(self.q)-1 - while 2*k <= N: - j = 2*k + N = self.size() + while 2 * k <= N: + j = 2 * k # Here we test if both children are greater than parent # if not swap with larger one. - if j self.q[j]): break self._swap(k, j) k = j + def size(self): + return len(self.q) - 1 + def insert(self, value): - self.q.append(value) - self._swim(len(self.q) - 1) + if (self.size()) < self.maxsize: + self.q.append(value) + self._swim(self.size()) + else: + self._replaceRoot(value) - def getQ(self): + def getElements(self): return self.q[1:] - def replaceRoot(self, value): + def _replaceRoot(self, value): if(self.q[1] > value): self.q[1] = value self._sink(1) - def delMax(self): - r = self.q[1] - self.q[1] = self.q[len(self.q) - 1] - self.q.pop() - self._sink(1) - return r - class RDD(object): """ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. @@ -778,15 +792,12 @@ def takeOrdered(self, num, key=None): """ def topNKeyedElems(iterator, key_=None): - q = MaxHeapQ() + q = MaxHeapQ(num) for k in iterator: - if not (key_ == None): + if key_ != None: k = (key_(k), k) - if (len(q.q) -1) < num: - q.insert(k) - else: - q.replaceRoot(k) - yield q.getQ() + q.insert(k) + yield q.getElements() def merge(a, b): return next(topNKeyedElems(a + b)) From 2b1124dc41cb91b23f0b687a94f235a00dad4bd9 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 2 Apr 2014 16:23:48 +0530 Subject: [PATCH 3/4] fixed tests --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1d1ac5d0b520a..8cf668f7988f8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -764,7 +764,7 @@ def top(self, num): >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) [12] >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) - [5, 6] + [6, 5] """ def topIterator(iterator): q = [] From 35f86baaaed7ebb5985f8650b4ffe69c343dd735 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 3 Apr 2014 12:54:13 +0530 Subject: [PATCH 4/4] code review --- python/pyspark/rdd.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8cf668f7988f8..9943296b927dc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -788,7 +788,7 @@ def takeOrdered(self, num, key=None): >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) [1, 2, 3, 4, 5, 6] >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) - [(-10, 10), (-9, 9), (-7, 7), (-6, 6), (-5, 5), (-4, 4)] + [10, 9, 7, 6, 5, 4] """ def topNKeyedElems(iterator, key_=None): @@ -799,10 +799,15 @@ def topNKeyedElems(iterator, key_=None): q.insert(k) yield q.getElements() + def unKey(x, key_=None): + if key_ != None: + x = [i[1] for i in x] + return x + def merge(a, b): return next(topNKeyedElems(a + b)) - - return sorted(self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)) + result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) + return sorted(unKey(result, key), key=key) def take(self, num):