Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
import shlex
import traceback
from bisect import bisect_right
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
Expand Down Expand Up @@ -901,6 +902,97 @@ def sampleVariance(self):
1.0
"""
return self.stats().sampleVariance()

def histogram(self, buckets=None, evenBuckets=False, bucketCount=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can define this API as

def histogram(self, buckets, even=False):

buckets can be list or int.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Why didn't I come up with it :)

"""
Compute a histogram using the provided buckets or bucketCount. The
buckets are all open to the right except for the last which is closed
e.g. for the array [1, 10, 20, 50], the buckets are [1, 10), [10, 20),
[20, 50] i.e. 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 and 50
we would have a histogram of 1, 0, 1.

If bucketCount is supplied, evenly-spaced buckets are automatically
constructed using the minimum and maximum of the RDD. For example if the
min value is 0 and the max is 100 and there are two buckets the resulting
buckets will be [0, 50) [50, 100]. bucketCount must be at least 1.
Exactly one of buckets and bucketCount must be provided.

Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can
be switched from an O(log n) computation to O(1) per element (where n is
the number of buckets) if you set evenBuckets to true.
buckets must be sorted and not contain any duplicates.
buckets array must be at least two elements

>>> a = sc.parallelize(range(100))
>>> a.histogram(bucketCount=2)
([0.0, 49.5, 99.0], [50, 50])
>>> a.histogram(3)
([0.0, 33.0, 66.0, 99.0], [33, 33, 34])
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90])
([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11])
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], evenBuckets=True)
([0, 10, 20, 30, 40, 50, 60, 70, 80, 90], [10, 10, 10, 10, 10, 10, 10, 10, 11])
>>> a.histogram([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99])
([0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99], [10, 10, 10, 10, 10, 10, 10, 10, 10, 10])

"""

if (buckets and bucketCount) or (not buckets and not bucketCount):
raise ValueError("Pass either buckets or bucketCount but not both")

if bucketCount and bucketCount <= 0:
raise ValueError("bucketCount must be positive")

if type(buckets) == int: #Treat int argument as bucketCount, not buckets
bucketCount = buckets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add even=True


def getBuckets():
mm_stats = self.stats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it stats for short

min = mm_stats.min()
max = mm_stats.max()
increment = (max - min) * 1.0 / bucketCount
if increment != 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if increment:

buckets = [min+x*increment for x in range(bucketCount+1)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last one is buckets should be max.

else:
buckets = [min, max]
return buckets

def histogramPartition(iterator):
counters = [0 for i in range(len(buckets) - 1)]
for i in iterator:
if evenBuckets:
t = fastBucketFunction(buckets[0], buckets[1] - buckets[0], len(buckets)-1, i)
else:
t = basicBucketFunction(i)
if not t == None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t is not None

counters[t] += 1
return [counters]

def mergeCounters(a1, a2):
for i in range(len(a1)):
a1[i] = a1[i] + a2[i]
return a1

def basicBucketFunction(e):
loc = bisect_right(buckets, e, 0, len(buckets))
if loc > 0 and loc < len(buckets):
return loc - 1
elif loc == len(buckets) and e == buckets[loc-1]:
# last bucket is closed on the right
return loc - 2
else:
return None

def fastBucketFunction(minimum, inc, count, e):
bucketNumber = (e - minimum) * 1.0 / inc # avoid Python integer div
if (bucketNumber > count or bucketNumber < 0):
return None
return min(int(bucketNumber), count -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right of bucket is open, so if bucketNumber is integer, the return value should be int(bucketNumer) - 1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies This part of the code is taken straight from the Scala version of the histogram API. I will investigate and get back to you.


if bucketCount:
evenBuckets = True
buckets = getBuckets()
return (buckets, self.mapPartitions(lambda x: histogramPartition(x)).reduce(mergeCounters))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return buckets always? The return value should be documented.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies Scala histogram method has two signatures. One of them return a pair of Arrays. The other one returns only the histogram values array. Bringing that to Python would mean that the same function returns different data structure depending on argument. Would that be a good idea?


def countByValue(self):
"""
Expand Down