diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b6038..8800fb36ee5a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -25,6 +25,7 @@ import sys import shlex import traceback +from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -901,6 +902,97 @@ def sampleVariance(self): 1.0 """ return self.stats().sampleVariance() + + def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): + """ + Compute a histogram using the provided buckets or bucketCount. The + buckets are all open to the right except for the last which is closed + e.g. for the array [1, 10, 20, 50], the buckets are [1, 10), [10, 20), + [20, 50] i.e. 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 + we would have a histogram of 1, 0, 1. + + If bucketCount is supplied, evenly-spaced buckets are automatically + constructed using the minimum and maximum of the RDD. For example if the + min value is 0 and the max is 100 and there are two buckets the resulting + buckets will be [0, 50) [50, 100]. bucketCount must be at least 1. + Exactly one of buckets and bucketCount must be provided. + + Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can + be switched from an O(log n) computation to O(1) per element (where n is + the number of buckets) if you set evenBuckets to true. + buckets must be sorted and not contain any duplicates. + buckets array must be at least two elements + + >>> a = sc.parallelize(range(100)) + >>> a.histogram(bucketCount=2) + ([0.0, 49.5, 99.0], [50, 50]) + >>> a.histogram(3) + ([0.0, 33.0, 66.0, 99.0], [33, 33, 34]) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11]) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11]) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]) + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99], [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]) + + """ + + if (buckets and bucketCount) or (not buckets and not bucketCount): + raise ValueError("Pass either buckets or bucketCount but not both") + + if bucketCount and bucketCount <= 0: + raise ValueError("bucketCount must be positive") + + if type(buckets) == int: #Treat int argument as bucketCount, not buckets + bucketCount = buckets + + def getBuckets(): + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + increment = (max - min) * 1.0 / bucketCount + if increment != 0: + buckets = [min+x*increment for x in range(bucketCount+1)] + else: + buckets = [min, max] + return buckets + + def histogramPartition(iterator): + counters = [0 for i in range(len(buckets) - 1)] + for i in iterator: + if evenBuckets: + t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets)-1, i) + else: + t = basicBucketFunction(i) + if not t == None: + counters[t] += 1 + return [counters] + + def mergeCounters(a1, a2): + for i in range(len(a1)): + a1[i] = a1[i] + a2[i] + return a1 + + def basicBucketFunction(e): + loc = bisect_right(buckets, e, 0, len(buckets)) + if loc > 0 and loc < len(buckets): + return loc - 1 + elif loc == len(buckets) and e == buckets[loc-1]: + # last bucket is closed on the right + return loc - 2 + else: + return None + + def fastBucketFunction(minimum, inc, count, e): + bucketNumber = (e - minimum) * 1.0 / inc # avoid Python integer div + if (bucketNumber > count or bucketNumber < 0): + return None + return min(int(bucketNumber), count -1) + + if bucketCount: + evenBuckets = True + buckets = getBuckets() + return (buckets, self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters)) def countByValue(self): """