Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 128 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import heapq
import bisect
from random import Random
from math import sqrt, log
from math import sqrt, log, isinf, isnan

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
Expand Down Expand Up @@ -856,6 +856,133 @@ def redFunc(left_counter, right_counter):

return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)

def histogram(self, buckets):
"""
Compute a histogram using the provided buckets. The buckets
are all open to the right except for the last which is closed.
e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
and 50 we would have a histogram of 1,0,1.

If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
this can be switched from an O(log n) inseration to O(1) per
element(where n = # buckets).

Buckets must be sorted and not contain any duplicates, must be
at least two elements.

If `buckets` is a number, it will generates buckets which are
evenly spaced between the minimum and maximum of the RDD. For
example, if the min value is 0 and the max is 100, given buckets
as 2, the resulting buckets will be [0,50) [50,100]. buckets must
be at least 1 If the RDD contains infinity, NaN throws an exception
If the elements in RDD do not vary (max == min) always returns
a single bucket.

It will return an tuple of buckets and histogram.

>>> rdd = sc.parallelize(range(51))
>>> rdd.histogram(2)
([0, 25, 50], [25, 26])
>>> rdd.histogram([0, 5, 25, 50])
([0, 5, 25, 50], [5, 20, 26])
>>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
([0, 15, 30, 45, 60], [15, 15, 15, 6])
>>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
>>> rdd.histogram(("a", "b", "c"))
(('a', 'b', 'c'), [2, 2])
"""

if isinstance(buckets, (int, long)):
if buckets < 1:
raise ValueError("number of buckets must be >= 1")

# filter out non-comparable elements
def comparable(x):
if x is None:
return False
if type(x) is float and isnan(x):
return False
return True

filtered = self.filter(comparable)

# faster than stats()
def minmax(a, b):
return min(a[0], b[0]), max(a[1], b[1])
try:
minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
except TypeError as e:
if " empty " in str(e):
raise ValueError("can not generate buckets from empty RDD")
raise

if minv == maxv or buckets == 1:
return [minv, maxv], [filtered.count()]

try:
inc = (maxv - minv) / buckets
except TypeError:
raise TypeError("Can not generate buckets with non-number in RDD")

if isinf(inc):
raise ValueError("Can not generate buckets with infinite value")

# keep them as integer if possible
if inc * buckets != maxv - minv:
inc = (maxv - minv) * 1.0 / buckets

buckets = [i * inc + minv for i in range(buckets)]
buckets.append(maxv) # fix accumulated error
even = True

elif isinstance(buckets, (list, tuple)):
if len(buckets) < 2:
raise ValueError("buckets should have more than one value")

if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
raise ValueError("can not have None or NaN in buckets")

if sorted(buckets) != list(buckets):
raise ValueError("buckets should be sorted")

if len(set(buckets)) != len(buckets):
raise ValueError("buckets should not contain duplicated values")

minv = buckets[0]
maxv = buckets[-1]
even = False
inc = None
try:
steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
except TypeError:
pass # objects in buckets do not support '-'
else:
if max(steps) - min(steps) < 1e-10: # handle precision errors
even = True
inc = (maxv - minv) / (len(buckets) - 1)

else:
raise TypeError("buckets should be a list or tuple or number(int or long)")

def histogram(iterator):
counters = [0] * len(buckets)
for i in iterator:
if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
continue
t = (int((i - minv) / inc) if even
else bisect.bisect_right(buckets, i) - 1)
counters[t] += 1
# add last two together
last = counters.pop()
counters[-1] += last
return [counters]

def mergeCounters(a, b):
return [i + j for i, j in zip(a, b)]

return buckets, self.mapPartitions(histogram).reduce(mergeCounters)

def mean(self):
"""
Compute the mean of this RDD's elements.
Expand Down
104 changes: 104 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,110 @@ def test_zip_with_different_number_of_items(self):
self.assertEquals(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())

def test_histogram(self):
# empty
rdd = self.sc.parallelize([])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
self.assertRaises(ValueError, lambda: rdd.histogram(1))

# out of range
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0], rdd.histogram([0, 10])[1])
self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])

# in range with one bucket
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals([4], rdd.histogram([0, 10])[1])
self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])

# in range with one bucket exact match
self.assertEquals([4], rdd.histogram([1, 4])[1])

# out of range with two buckets
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])

# out of range with two uneven buckets
rdd = self.sc.parallelize([10.01, -0.01])
self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])

# in range with two buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])

# in range with two bucket and None
rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])

# in range with two uneven buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1])

# mixed range with two uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01])
self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1])

# mixed range with four uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1])
self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])

# mixed range with uneven buckets and NaN
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0,
199.0, 200.0, 200.1, None, float('nan')])
self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])

# out of range with infinite buckets
rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])

# invalid buckets
self.assertRaises(ValueError, lambda: rdd.histogram([]))
self.assertRaises(ValueError, lambda: rdd.histogram([1]))
self.assertRaises(ValueError, lambda: rdd.histogram(0))
self.assertRaises(TypeError, lambda: rdd.histogram({}))

# without buckets
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals(([1, 4], [4]), rdd.histogram(1))

# without buckets single element
rdd = self.sc.parallelize([1])
self.assertEquals(([1, 1], [1]), rdd.histogram(1))

# without bucket no range
rdd = self.sc.parallelize([1] * 4)
self.assertEquals(([1, 1], [4]), rdd.histogram(1))

# without buckets basic two
rdd = self.sc.parallelize(range(1, 5))
self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2))

# without buckets with more requested than elements
rdd = self.sc.parallelize([1, 2])
buckets = [1 + 0.2 * i for i in range(6)]
hist = [1, 0, 0, 0, 1]
self.assertEquals((buckets, hist), rdd.histogram(5))

# invalid RDDs
rdd = self.sc.parallelize([1, float('inf')])
self.assertRaises(ValueError, lambda: rdd.histogram(2))
rdd = self.sc.parallelize([float('nan')])
self.assertRaises(ValueError, lambda: rdd.histogram(2))

# string
rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))

# mixed RDD
rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))


class TestIO(PySparkTestCase):

Expand Down