From e97e342dac1cbbad2e424a39159a6c7f3fa63bf4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 26 Aug 2014 11:49:34 -0700 Subject: [PATCH 1/8] add countApproxDistinct() --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- python/pyspark/rdd.py | 33 +++++++++++++++++-- python/pyspark/tests.py | 9 +++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index daea2617e62ea..af9e31ba7b720 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -993,7 +993,7 @@ abstract class RDD[T: ClassTag]( */ @Experimental def countApproxDistinct(p: Int, sp: Int): Long = { - require(p >= 4, s"p ($p) must be greater than 0") + require(p >= 4, s"p ($p) must be at least 4") require(sp <= 32, s"sp ($sp) cannot be greater than 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val zeroCounter = new HyperLogLogPlus(p, sp) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1374f74968c9e..e373bef72cc2b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1864,7 +1864,7 @@ def _is_pickled(self): return True return False - def _to_jrdd(self): + def _to_java_object_rdd(self): """ Return an JavaRDD of Object by unpickling It will convert each Python object into Java object by Pyrolite, whenever the @@ -1899,7 +1899,7 @@ def sumApprox(self, timeout, confidence=0.95): >>> (rdd.sumApprox(1000) - r) / r < 0.05 True """ - jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd() + jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd() jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) r = jdrdd.sumApprox(timeout, confidence).getFinalValue() return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) @@ -1915,11 +1915,38 @@ def meanApprox(self, timeout, confidence=0.95): >>> (rdd.meanApprox(1000) - r) / r < 0.05 True """ - jrdd = self.map(float)._to_jrdd() + jrdd = self.map(float)._to_java_object_rdd() jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) r = jdrdd.meanApprox(timeout, confidence).getFinalValue() return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) + def countApproxDistinct(self, relativeSD=0.05): + """ + :: Experimental :: + Return approximate number of distinct elements in the RDD. + + The algorithm used is based on streamlib's implementation of + "HyperLogLog in Practice: Algorithmic Engineering of a State + of The Art Cardinality Estimation Algorithm", available + here. + + This support all the types of objects, which is supported by + Pyrolite, nearly all builtin types. + + @param relativeSD Relative accuracy. Smaller values create + counters that require more space. + It must be greater than 0.000017. + + >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() + >>> 950 < n < 1050 + True + """ + if relativeSD < 0.000017: + raise ValueError("relativeSD should be greater than 0.000017") + if relativeSD > 0.37: + raise ValueError("relativeSD should be smaller than 0.37") + return self._to_java_object_rdd().countApproxDistinct(relativeSD) + class PipelinedRDD(RDD): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 51bfbb47e53c2..8fa1a776aa982 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -364,6 +364,15 @@ def test_zip_with_different_number_of_items(self): self.assertEquals(a.count(), b.count()) self.assertRaises(Exception, lambda: a.zip(b).count()) + def test_count_approx_distinct(self): + rdd = self.sc.parallelize(range(1000)) + self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050) + self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001)) + self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5)) + class TestIO(PySparkTestCase): From 4cba98f05f4d3d8373be577946d0f81b44bedb8b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 27 Aug 2014 20:23:34 -0700 Subject: [PATCH 2/8] add more tests --- python/pyspark/rdd.py | 3 +++ python/pyspark/tests.py | 9 ++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 45b9f70c9339f..a249cf325ed92 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2018,6 +2018,9 @@ def countApproxDistinct(self, relativeSD=0.05): >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() >>> 950 < n < 1050 True + >>> n = self.sc.parallelize([i % 20 for i in range(1000)]) + >>> 18 < n < 22 + True """ if relativeSD < 0.000017: raise ValueError("relativeSD should be greater than 0.000017") diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 39fee03542613..8dfb76682457f 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -409,7 +409,14 @@ def test_count_approx_distinct(self): self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050) self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050) self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050) - self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.map(lambda x: set([x, -x])).countApproxDistinct(0.04) < 1050) + + rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7) + self.assertTrue(18 < rdd.countApproxDistinct() < 22) + self.assertTrue(18 < rdd.map(float).countApproxDistinct() < 22) + self.assertTrue(18 < rdd.map(str).countApproxDistinct() < 22) + self.assertTrue(18 < rdd.map(lambda x: set([x, -x])).countApproxDistinct() < 22) + self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001)) self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5)) From ded624f11ece4bb53a2b56a3d02697b428f94a8e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 28 Aug 2014 12:16:38 -0700 Subject: [PATCH 3/8] calculate hash in Python --- python/pyspark/rdd.py | 11 +++++++---- python/pyspark/tests.py | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a249cf325ed92..9387eefc43b5e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2008,9 +2008,6 @@ def countApproxDistinct(self, relativeSD=0.05): of The Art Cardinality Estimation Algorithm", available here. - This support all the types of objects, which is supported by - Pyrolite, nearly all builtin types. - @param relativeSD Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017. @@ -2026,7 +2023,13 @@ def countApproxDistinct(self, relativeSD=0.05): raise ValueError("relativeSD should be greater than 0.000017") if relativeSD > 0.37: raise ValueError("relativeSD should be smaller than 0.37") - return self._to_java_object_rdd().countApproxDistinct(relativeSD) + hashRDD = self.map(lambda x: portable_hash(x) % sys.maxint) + c = hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) + # range of hash is [0, sys.maxint] + if c > sys.maxint / 30: + # correction for hash collision in Python + c = -sys.maxint * log(1 - float(c) / sys.maxint) + return int(c) class PipelinedRDD(RDD): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8dfb76682457f..f1a75cbff5c19 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -409,13 +409,13 @@ def test_count_approx_distinct(self): self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050) self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050) self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050) - self.assertTrue(950 < rdd.map(lambda x: set([x, -x])).countApproxDistinct(0.04) < 1050) + self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050) rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7) self.assertTrue(18 < rdd.countApproxDistinct() < 22) self.assertTrue(18 < rdd.map(float).countApproxDistinct() < 22) self.assertTrue(18 < rdd.map(str).countApproxDistinct() < 22) - self.assertTrue(18 < rdd.map(lambda x: set([x, -x])).countApproxDistinct() < 22) + self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22) self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001)) self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5)) From d30649283cb20f00dbd1ec0c25bdbc4815cf52d6 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 28 Aug 2014 12:26:41 -0700 Subject: [PATCH 4/8] change range of hash of tuple to [0, maxint] --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9387eefc43b5e..85c625c27bfd9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -72,7 +72,7 @@ def portable_hash(x): for i in x: h ^= portable_hash(i) h *= 1000003 - h &= 0xffffffff + h &= sys.maxint h ^= len(x) if h == -1: h = -2 From 9d2565fe6c6d1956e66f830c7e8c3a255998e6b1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 28 Aug 2014 12:43:34 -0700 Subject: [PATCH 5/8] add commments and link for hash collision correction --- python/pyspark/rdd.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 85c625c27bfd9..a6e56e8796582 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2027,8 +2027,10 @@ def countApproxDistinct(self, relativeSD=0.05): c = hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) # range of hash is [0, sys.maxint] if c > sys.maxint / 30: - # correction for hash collision in Python - c = -sys.maxint * log(1 - float(c) / sys.maxint) + # correction for hash collision in Python, + # hash collision probability is 1 - exp(-X), so X = - log(1 - p) + # see http://preshing.com/20110504/hash-collision-probabilities/ + c = - sys.maxint * log(1 - float(c) / sys.maxint) return int(c) From 2ab157c567fe9f45394160377ac7f26c05ac2f0b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 28 Aug 2014 13:38:47 -0700 Subject: [PATCH 6/8] fix doc tests --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a6e56e8796582..5b7ffc0f960e7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2015,7 +2015,7 @@ def countApproxDistinct(self, relativeSD=0.05): >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() >>> 950 < n < 1050 True - >>> n = self.sc.parallelize([i % 20 for i in range(1000)]) + >>> n = self.sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() >>> 18 < n < 22 True """ From c38c4e474849e3c2a13b5f508ee217ce63dd56bf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 28 Aug 2014 14:41:42 -0700 Subject: [PATCH 7/8] fix doc tests --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5b7ffc0f960e7..701acce5c01bd 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -62,7 +62,7 @@ def portable_hash(x): >>> portable_hash(None) 0 - >>> portable_hash((None, 1)) + >>> portable_hash((None, 1)) & 0xffffffff 219750521 """ if x is None: @@ -2015,7 +2015,7 @@ def countApproxDistinct(self, relativeSD=0.05): >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() >>> 950 < n < 1050 True - >>> n = self.sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() + >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() >>> 18 < n < 22 True """ From e20da472979ccb4b12fe8d7d7a059fde9af4ef60 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 2 Sep 2014 12:12:33 -0700 Subject: [PATCH 8/8] remove the correction in Python --- python/pyspark/rdd.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 701acce5c01bd..6fc9f66bc5a94 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2023,15 +2023,9 @@ def countApproxDistinct(self, relativeSD=0.05): raise ValueError("relativeSD should be greater than 0.000017") if relativeSD > 0.37: raise ValueError("relativeSD should be smaller than 0.37") - hashRDD = self.map(lambda x: portable_hash(x) % sys.maxint) - c = hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) - # range of hash is [0, sys.maxint] - if c > sys.maxint / 30: - # correction for hash collision in Python, - # hash collision probability is 1 - exp(-X), so X = - log(1 - p) - # see http://preshing.com/20110504/hash-collision-probabilities/ - c = - sys.maxint * log(1 - float(c) / sys.maxint) - return int(c) + # the hash space in Java is 2^32 + hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) + return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) class PipelinedRDD(RDD):