diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ae09dbff02a36..8c7f5074a7ce4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -25,6 +25,7 @@ import sys import shlex import traceback +from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -572,7 +573,6 @@ def func(iterator): # TODO: aggregate - def max(self): """ Find the maximum item in this RDD. @@ -666,6 +666,65 @@ def sampleVariance(self): """ return self.stats().sampleVariance() + def _getBuckets(self, bucketCount): + #use the statscounter as a quick way of getting max and min + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + + increment = (max-min)/bucketCount + buckets = range(min,min) + if increment != 0: + buckets = range(min,max, increment) + + return {"min":min, "max":max, "buckets":buckets} + + def histogram(self, bucketCount, buckets=None): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) + defaultdict(, {(67, 100): 2, (1, 33): 6, (34, 66): 2}) + """ + min = float("-inf") + max = float("inf") + evenBuckets = False + if not buckets: + b = self._getBuckets(bucketCount) + buckets = b["buckets"] + min = b["min"] + max = b["max"] + + if len(buckets) < 2: + raise ValueError("requires more than 1 bucket") + if len(buckets) % 2 == 0: + evenBuckets = True + + def histogramPartition(iterator): + counters = defaultdict(int) + for obj in iterator: + k = bisect_right(buckets, obj) + if k < len(buckets) and k > 0: + key = (buckets[k-1], buckets[k]-1) + elif k == len(buckets): + key = (buckets[k-1], max) + elif k == 0: + key = (min, buckets[k]-1) + print obj, k, key + counters[key] += 1 + yield counters + + + def mergeCounters(d1, d2): + for k in d2.keys(): + if k in d1: + d1[k] += d2[k] + return d1 + + return self.mapPartitions(histogramPartition).reduce(mergeCounters) + + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of