Skip to content

Commit ece1fa4

Browse files
committed
Added top in python.
1 parent 0283665 commit ece1fa4

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

python/pyspark/rdd.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from tempfile import NamedTemporaryFile
2929
from threading import Thread
3030
import warnings
31+
from heapq import heappush, heappop, heappushpop
3132

3233
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
3334
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -628,6 +629,30 @@ def mergeMaps(m1, m2):
628629
m1[k] += v
629630
return m1
630631
return self.mapPartitions(countPartition).reduce(mergeMaps)
632+
633+
def top(self, num):
634+
"""
635+
Get the top N elements from a RDD.
636+
637+
Note: It returns the list sorted in ascending order.
638+
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
639+
[12]
640+
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2)
641+
[5, 6]
642+
"""
643+
def topIterator(iterator):
644+
q = []
645+
for k in iterator:
646+
if len(q) < num:
647+
heappush(q, k)
648+
else:
649+
heappushpop(q, k)
650+
yield q
651+
652+
def merge(a, b):
653+
return next(topIterator(a + b))
654+
655+
return sorted(self.mapPartitions(topIterator).reduce(merge))
631656

632657
def take(self, num):
633658
"""

0 commit comments

Comments
 (0)