Skip to content

Commit ded624f

Browse files
committed
calculate hash in Python
1 parent 4cba98f commit ded624f

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

python/pyspark/rdd.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,9 +2008,6 @@ def countApproxDistinct(self, relativeSD=0.05):
20082008
of The Art Cardinality Estimation Algorithm", available
20092009
<a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
20102010
2011-
This support all the types of objects, which is supported by
2012-
Pyrolite, nearly all builtin types.
2013-
20142011
@param relativeSD Relative accuracy. Smaller values create
20152012
counters that require more space.
20162013
It must be greater than 0.000017.
@@ -2026,7 +2023,13 @@ def countApproxDistinct(self, relativeSD=0.05):
20262023
raise ValueError("relativeSD should be greater than 0.000017")
20272024
if relativeSD > 0.37:
20282025
raise ValueError("relativeSD should be smaller than 0.37")
2029-
return self._to_java_object_rdd().countApproxDistinct(relativeSD)
2026+
hashRDD = self.map(lambda x: portable_hash(x) % sys.maxint)
2027+
c = hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
2028+
# range of hash is [0, sys.maxint]
2029+
if c > sys.maxint / 30:
2030+
# correction for hash collision in Python
2031+
c = -sys.maxint * log(1 - float(c) / sys.maxint)
2032+
return int(c)
20302033

20312034

20322035
class PipelinedRDD(RDD):

python/pyspark/tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,13 +409,13 @@ def test_count_approx_distinct(self):
409409
self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050)
410410
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050)
411411
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050)
412-
self.assertTrue(950 < rdd.map(lambda x: set([x, -x])).countApproxDistinct(0.04) < 1050)
412+
self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050)
413413

414414
rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7)
415415
self.assertTrue(18 < rdd.countApproxDistinct() < 22)
416416
self.assertTrue(18 < rdd.map(float).countApproxDistinct() < 22)
417417
self.assertTrue(18 < rdd.map(str).countApproxDistinct() < 22)
418-
self.assertTrue(18 < rdd.map(lambda x: set([x, -x])).countApproxDistinct() < 22)
418+
self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22)
419419

420420
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001))
421421
self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5))

0 commit comments

Comments
 (0)