From aecb5bcccbec0df2c378be4b03172c7c5ac63364 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Mon, 10 Mar 2014 23:58:32 -0700 Subject: [PATCH 1/7] added histogram method, added max and min to statscounter --- python/pyspark/rdd.py | 61 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b6038d..7797c01fb1e91 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -25,6 +25,7 @@ import sys import shlex import traceback +from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -902,6 +903,66 @@ def sampleVariance(self): """ return self.stats().sampleVariance() + def _getBuckets(self, bucketCount): + #use the statscounter as a quick way of getting max and min + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + + increment = (max-min)/bucketCount + buckets = range(min,min) + if increment != 0: + buckets = range(min,max, increment) + + return {"min":min, "max":max, "buckets":buckets} + + def histogram(self, bucketCount, buckets=None): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) + defaultdict(, {(67, 100): 2, (1, 33): 6, (34, 66): 2}) + """ + min = float("-inf") + max = float("inf") + evenBuckets = False + if not buckets: + b = self._getBuckets(bucketCount) + buckets = b["buckets"] + min = b["min"] + max = b["max"] + + if len(buckets) < 2: + raise ValueError("requires more than 1 bucket") + if len(buckets) % 2 == 0: + evenBuckets = True + # histogram partition + def histogramPartition(iterator): + counters = defaultdict(int) + for obj in iterator: + k = bisect_right(buckets, obj) + if k < len(buckets) and k > 0: + key = (buckets[k-1], buckets[k]-1) + elif k == len(buckets): + key = (buckets[k-1], max) + elif k == 0: + key = (min, buckets[k]-1) + print obj, k, key + counters[key] += 1 + yield counters + + # merge counters + def mergeCounters(d1, d2): + for k in d2.keys(): + if k in d1: + d1[k] += d2[k] + return d1 + + #map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters) + return self.mapPartitions(histogramPartition).reduce(mergeCounters) + + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of From 0c2bbdd2956db71d227f89822f246293fc6e6458 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 11 Mar 2014 13:21:22 +0530 Subject: [PATCH 2/7] SPARK-1170 Added histogram(buckets) to pyspark and not histogram(noOfBuckets). --- python/pyspark/rdd.py | 136 ++++++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 57 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7797c01fb1e91..98bd1914557fa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -47,6 +47,7 @@ get_used_memory from py4j.java_collections import ListConverter, MapConverter +from bisect import bisect_left __all__ = ["RDD"] @@ -902,66 +903,87 @@ def sampleVariance(self): 1.0 """ return self.stats().sampleVariance() + + def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): + """ + Compute a histogram using the provided buckets. The buckets are all open + to the left except for the last which is closed e.g. for the array + [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50] i.e. 1<=x<10, + 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a + histogram of 1, 0, 1. + + If bucketCount is supplied, evenly-spaced buckets are automatically + constructed using the minimum and maximum of the RDD. For example if the + min value is 0 and the max is 100 and there are two buckets the resulting + buckets will be [0, 50) [50, 100]. bucketCount must be at least 1. + Exactly one of buckets and bucketCount must be provided. + + Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can + be switched from an O(log n) computation to O(1) per element (where n is + the number of buckets) if you set evenBuckets to true. + buckets must be sorted and not contain any duplicates. + buckets array must be at least two elements + + >>> a = sc.parallelize(range(100)) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) + [10, 10, 10, 10, 10, 10, 10, 10, 11] + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) + [10, 10, 10, 10, 10, 10, 10, 10, 11] + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]) + [10, 10, 10, 10, 10, 10, 10, 10, 10, 10] + """ + + if (buckets and bucketCount) or (not buckets and not bucketCount): + raise ValueError("Pass either buckets or bucketCount but not both") + + if bucketCount <= 0: + raise ValueError("bucketCount must be positive") + + def getBuckets(): + #use the statscounter as a quick way of getting max and min + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + increment = (max - min) / bucketCount + if increment != 0: + buckets = range(min, max, increment) + else: + buckets = [min, max] + return buckets - def _getBuckets(self, bucketCount): - #use the statscounter as a quick way of getting max and min - mm_stats = self.stats() - min = mm_stats.min() - max = mm_stats.max() - - increment = (max-min)/bucketCount - buckets = range(min,min) - if increment != 0: - buckets = range(min,max, increment) - - return {"min":min, "max":max, "buckets":buckets} - - def histogram(self, bucketCount, buckets=None): - """ - Compute a histogram of the data using bucketCount number of buckets - evenly spaced between the min and max of the RDD. - - >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) - defaultdict(, {(67, 100): 2, (1, 33): 6, (34, 66): 2}) - """ - min = float("-inf") - max = float("inf") - evenBuckets = False - if not buckets: - b = self._getBuckets(bucketCount) - buckets = b["buckets"] - min = b["min"] - max = b["max"] - - if len(buckets) < 2: - raise ValueError("requires more than 1 bucket") - if len(buckets) % 2 == 0: - evenBuckets = True - # histogram partition def histogramPartition(iterator): - counters = defaultdict(int) - for obj in iterator: - k = bisect_right(buckets, obj) - if k < len(buckets) and k > 0: - key = (buckets[k-1], buckets[k]-1) - elif k == len(buckets): - key = (buckets[k-1], max) - elif k == 0: - key = (min, buckets[k]-1) - print obj, k, key - counters[key] += 1 - yield counters - - # merge counters - def mergeCounters(d1, d2): - for k in d2.keys(): - if k in d1: - d1[k] += d2[k] - return d1 - - #map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters) - return self.mapPartitions(histogramPartition).reduce(mergeCounters) + counters = [0 for i in range(len(buckets) - 1)] + for i in iterator: + if evenBuckets: + t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets), i) + else: + t = basicBucketFunction(i) + if t: + counters[t] += 1 + return [counters] + + def mergeCounters(a1, a2): + for i in range(len(a1)): + a1[i] = a1[i] + a2[i] + return a1 + + def basicBucketFunction(e): + loc = bisect_left(buckets, e, 0, len(buckets)) + if loc > 0 and loc < len(buckets): + return loc - 1 + else: + return None + + def fastBucketFunction(minimum, inc, count, e): + bucketNumber = (e - minimum) // inc + if (bucketNumber >= count or bucketNumber < 0): + return None + return min(bucketNumber, count -1) + if bucketCount: + evenBuckets = True + buckets = getBuckets() + return self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters) def countByValue(self): """ From 8427db65e134d82247cb2f719c26c78de5535db3 Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Tue, 5 Aug 2014 12:25:06 +0530 Subject: [PATCH 3/7] SPARK-1170. Merged commits and fixed bugs in both the original commits --- python/pyspark/rdd.py | 51 ++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 98bd1914557fa..008c3d56b46a8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -906,11 +906,11 @@ def sampleVariance(self): def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): """ - Compute a histogram using the provided buckets. The buckets are all open - to the left except for the last which is closed e.g. for the array - [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50] i.e. 1<=x<10, - 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a - histogram of 1, 0, 1. + Compute a histogram using the provided buckets or bucketCount. The + buckets are all open to the left except for the last which is closed + e.g. for the array [1, 10, 20, 50], the buckets are [1, 10), [10, 20), + [20, 50] i.e. 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 + we would have a histogram of 1, 0, 1. If bucketCount is supplied, evenly-spaced buckets are automatically constructed using the minimum and maximum of the RDD. For example if the @@ -925,28 +925,36 @@ def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): buckets array must be at least two elements >>> a = sc.parallelize(range(100)) - >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) - [10, 10, 10, 10, 10, 10, 10, 10, 11] + >>> a.histogram(bucketCount=2) + ([0.0, 49.5, 99.0], [50, 50]) + >>> a.histogram(3) + ([0.0, 33.0, 66.0, 99.0], [33, 33, 34]) >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) - [10, 10, 10, 10, 10, 10, 10, 10, 11] + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11]) + >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11]) >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]) - [10, 10, 10, 10, 10, 10, 10, 10, 10, 10] + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99], + [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]) + """ if (buckets and bucketCount) or (not buckets and not bucketCount): raise ValueError("Pass either buckets or bucketCount but not both") - if bucketCount <= 0: + if bucketCount and bucketCount <= 0: raise ValueError("bucketCount must be positive") + if type(buckets) == int: #Treat int argument as bucketCount, not buckets + bucketCount = buckets + def getBuckets(): - #use the statscounter as a quick way of getting max and min mm_stats = self.stats() min = mm_stats.min() max = mm_stats.max() - increment = (max - min) / bucketCount + increment = (max - min) * 1.0 / bucketCount if increment != 0: - buckets = range(min, max, increment) + buckets = [round(min+x*increment, 2) for x in range(bucketCount+1)] else: buckets = [min, max] return buckets @@ -955,10 +963,10 @@ def histogramPartition(iterator): counters = [0 for i in range(len(buckets) - 1)] for i in iterator: if evenBuckets: - t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets), i) + t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets)-1, i) else: t = basicBucketFunction(i) - if t: + if not t == None: counters[t] += 1 return [counters] @@ -968,22 +976,25 @@ def mergeCounters(a1, a2): return a1 def basicBucketFunction(e): - loc = bisect_left(buckets, e, 0, len(buckets)) + loc = bisect_right(buckets, e, 0, len(buckets)) if loc > 0 and loc < len(buckets): return loc - 1 + elif loc == len(buckets) and e == buckets[loc-1]: + # last bucket is closed on the right + return loc - 2 else: return None def fastBucketFunction(minimum, inc, count, e): - bucketNumber = (e - minimum) // inc - if (bucketNumber >= count or bucketNumber < 0): + bucketNumber = (e - minimum) * 1.0 / inc # avoid Python integer div + if (bucketNumber > count or bucketNumber < 0): return None - return min(bucketNumber, count -1) + return min(int(bucketNumber), count -1) if bucketCount: evenBuckets = True buckets = getBuckets() - return self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters) + return (buckets, self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters)) def countByValue(self): """ From bdd3d7afdbc0fe92e7600312ac8c74486c5f22d0 Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Tue, 5 Aug 2014 13:36:01 +0530 Subject: [PATCH 4/7] SPARK-1170. Merged commits and fixed bugs in both the original commits --- python/pyspark/rdd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 008c3d56b46a8..a3d79c797479e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -47,7 +47,6 @@ get_used_memory from py4j.java_collections import ListConverter, MapConverter -from bisect import bisect_left __all__ = ["RDD"] From 7fe070a5947d4f5b4c8d1d65853c7f3d395f5b4b Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Tue, 5 Aug 2014 14:26:17 +0530 Subject: [PATCH 5/7] SPARK-1170. Fix a test case. --- python/pyspark/rdd.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a3d79c797479e..8a5539dbb603f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -933,8 +933,7 @@ def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True) ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11]) >>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99]) - ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99], - [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]) + ([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99], [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]) """ From 7b522d5ab9150a1b2da34031003749f2f6adc09a Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Tue, 5 Aug 2014 15:34:08 +0530 Subject: [PATCH 6/7] [SPARK-1170] Remove unnecessary rounding --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8a5539dbb603f..a2e2f693e8313 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -952,7 +952,7 @@ def getBuckets(): max = mm_stats.max() increment = (max - min) * 1.0 / bucketCount if increment != 0: - buckets = [round(min+x*increment, 2) for x in range(bucketCount+1)] + buckets = [min+x*increment for x in range(bucketCount+1)] else: buckets = [min, max] return buckets From c8dd62553514886a0f79727ea27c323fc83643bf Mon Sep 17 00:00:00 2001 From: Chandan Kumar Date: Tue, 5 Aug 2014 20:04:42 +0530 Subject: [PATCH 7/7] SPARK-1170. Fix a typo in doc comment. --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a2e2f693e8313..8800fb36ee5ac 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -906,7 +906,7 @@ def sampleVariance(self): def histogram(self, buckets=None, evenBuckets=False, bucketCount=None): """ Compute a histogram using the provided buckets or bucketCount. The - buckets are all open to the left except for the last which is closed + buckets are all open to the right except for the last which is closed e.g. for the array [1, 10, 20, 50], the buckets are [1, 10), [10, 20), [20, 50] i.e. 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50 we would have a histogram of 1, 0, 1.