From ff2cbe3effddae2c92d7d1ddb14c6762b42ae5fa Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 4 Aug 2014 23:55:40 -0700 Subject: [PATCH 01/19] add missing API in SparkContext --- python/pyspark/context.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2e80eb50f2207..a2fe7f14a20b2 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -259,6 +259,17 @@ def defaultMinPartitions(self): """ return self._jsc.sc().defaultMinPartitions() + @property + def isLocal(self): + """ + Whether the context run locally + """ + return self._jsc.isLocal() + + @property + def conf(self): + return self._conf + def stop(self): """ Shut down the SparkContext. @@ -724,6 +735,13 @@ def sparkUser(self): """ return self._jsc.sc().sparkUser() + @property + def startTime(self): + """ + Return the start time of context in millis seconds + """ + return self._jsc.startTime() + def cancelJobGroup(self, groupId): """ Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} From e0b3d307bb8a5988425f4eeff17d8cfc5469e6e8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 01:11:41 -0700 Subject: [PATCH 02/19] add histogram() --- .../apache/spark/api/python/PythonRDD.scala | 22 ++- python/pyspark/rdd.py | 170 +++++++++++++++++- 2 files changed, 184 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0b5322c6fb965..adaa33b5d7dd7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} import org.apache.spark._ import org.apache.spark.SparkContext._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaDoubleRDD, JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -730,7 +730,25 @@ private[spark] object PythonRDD extends Logging { } /** - * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by + * Convert a RDD of serialized Python objects to RDD of Double, that is usable by + * PySpark. + */ + def pythonToJavaDouble(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaDoubleRDD = { + new JavaDoubleRDD(pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj.asInstanceOf[JArrayList[_]].map(_.asInstanceOf[Double]) + } else { + Seq(obj.asInstanceOf[Double]) + } + } + }) + } + + /** + * Convert a RDD of Java objects to and RDD of serialized Python objects, that is usable by * PySpark. */ def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b6038d..5c97bc6b1016b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -32,6 +32,7 @@ import heapq from random import Random from math import sqrt, log +import array from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ @@ -734,6 +735,13 @@ def _collect_iterator_through_file(self, iterator): yield item os.unlink(tempFile.name) + def collectPartitions(self, partitions): + """ + Return an array that contains all of the elements in a specific + partition of this RDD. + """ + raise NotImplementedError + def reduce(self, f): """ Reduces the elements of this RDD using the specified commutative and @@ -808,23 +816,39 @@ def func(iterator): return self.mapPartitions(func).fold(zeroValue, combOp) - def max(self): + def max(self, comp=None): """ Find the maximum item in this RDD. - >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() + >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) + >>> rdd.max() 43.0 + >>> rdd.max(lambda a, b: cmp(str(a), str(b))) + 5.0 """ - return self.reduce(max) + if comp is not None: + func = lambda a, b: a if comp(a, b) >= 0 else b + else: + func = max + + return self.reduce(func) - def min(self): + def min(self, comp=None): """ Find the minimum item in this RDD. - >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() + >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) + >>> rdd.min() + 1.0 + >>> rdd.min(lambda a, b: cmp(str(a), str(b))) 1.0 """ - return self.reduce(min) + if comp is not None: + func = lambda a, b: a if comp(a, b) <= 0 else b + else: + func = min + + return self.reduce(func) def sum(self): """ @@ -854,6 +878,62 @@ def redFunc(left_counter, right_counter): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) + def histogram(self, bucketCount, buckets=None, evenBuckets=False): + """ + Compute a histogram of the data. + + Compute a histogram using the provided buckets. The buckets + are all open to the left except for the last which is closed + e.g. for the array [1,10,20,50] the buckets are [1,10) [10,20) + [20,50] e.g 1<=x<10, 10<=x<20, 20<=x<50. And on the input of 1 + and 50 we would have a histogram of 1,0,0. + + Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) + this can be switched from an O(log n) inseration to O(1) per + element(where n = # buckets), if you set evenBuckets to true. + Buckets must be sorted and not contain any duplicates, Buckets + array must be at least two elements All NaN entries are treated + the same. If you have a NaN bucket it must be the maximum value + of the last position and all NaN entries will be counted in that + bucket. + + If buckets is not given, it will be generated by using bucketCount + number of buckets evenly spaced between the minimum and maximum + of the RDD. For example if the min value is 0 and the max is 100 + and there are two buckets the resulting buckets will be [0,50) + [50,100]. bucketCount must be at least 1 If the RDD contains + infinity, NaN throws an exception If the elements in RDD do not + vary (max == min) always returns a single bucket. + + >>> rdd = sc.parallelize(range(50)) + >>> rdd.histogram(5) + [10L, 10L, 10L, 10L, 10L] + >>> rdd.histogram(3, [0, 5, 25, 50]) + [5L, 20L, 25L] + >>> rdd.histogram(4, [0, 15, 30, 45, 60], True) + [15L, 15L, 15L, 5L] + """ + + drdd = self.map(lambda x:float(x)) + batched = isinstance(drdd._jrdd_deserializer, BatchedSerializer) + jdrdd = self.ctx._jvm.PythonRDD.pythonToJavaDouble(drdd._jrdd, batched) + + if buckets is None: + if bucketCount < 1: + raise ValueError("bucketCount should be greater than 1") + + r = jdrdd.histogram(bucketCount) + return list(r._2()) + + elif len(buckets) != bucketCount + 1: + raise ValueError("bucketCount does not match with " + "length of buckets") + + jbuckets = self.ctx._gateway.new_array(self.ctx._gateway.jvm.java.lang.Double, len(buckets)) + for i in range(len(buckets)): + jbuckets[i] = float(buckets[i]) + return list(jdrdd.histogram(jbuckets, evenBuckets)) + def mean(self): """ Compute the mean of this RDD's elements. @@ -872,6 +952,7 @@ def variance(self): """ return self.stats().variance() + def stdev(self): """ Compute the standard deviation of this RDD's elements. @@ -1678,6 +1759,25 @@ def zip(self, other): other._jrdd_deserializer) return RDD(pairRDD, self.ctx, deserializer) + def zipPartitions(self, other, f): + """ + Zip this RDD's partitions with one (or more) RDD(s) and return a + new RDD by applying a function to the zipped partitions. + """ + raise NotImplementedError + + def zipWithIndex(self): + """ + Zips this RDD with its element indices. + """ + raise NotImplementedError + + def zipWithUniqueId(self): + """ + Zips this RDD with generated unique Long ids. + """ + raise NotImplementedError + def name(self): """ Return the name of this RDD. @@ -1743,6 +1843,64 @@ def _defaultReducePartitions(self): # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those # hashes aren't retained. + def lookup(self, key): + """ + Return the list of values in the RDD for key key. + """ + raise NotImplementedError + + def countApprox(self, timeout, confidence=1.0): + """ + :: Experimental :: + Approximate version of count() that returns a potentially incomplete + result within a timeout, even if not all tasks have finished. + """ + raise NotImplementedError + + def countApproxDistinct(self, timeout, confidence=1.0): + """ + Return approximate number of distinct elements in the RDD. + """ + raise NotImplementedError + + def countByValueApprox(self, timeout, confidence=1.0): + """ + :: Experimental:: + Approximate version of countByValue(). + """ + raise NotImplementedError + + def sumApprox(self, timeout, confidence=1.0): + """ + :: Experimental :: + Approximate operation to return the sum within a timeout + or meet the confidence. + """ + raise NotImplementedError + + def meanApprox(self, timeout, confidence=1.0): + """ + :: Experimental :: + Approximate operation to return the mean within a timeout + or meet the confidence. + """ + raise NotImplementedError + + def countApproxDistinctByKey(self): + """ + Return approximate number of distinct values for each key in this RDD. + """ + raise NotImplementedError + + def countByKeyApprox(self, timeout, confidence=1.0): + """ + :: Experimental :: + Approximate version of countByKey that can return a partial result if it does not finish within a timeout. + """ + raise NotImplementedError + + + class PipelinedRDD(RDD): From 5d5be95d1dcede041950bf092e0c6438a3aac536 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 10:59:03 -0700 Subject: [PATCH 03/19] change histogram API --- python/pyspark/rdd.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5c97bc6b1016b..d95a68c7d01b1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -878,7 +878,7 @@ def redFunc(left_counter, right_counter): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) - def histogram(self, bucketCount, buckets=None, evenBuckets=False): + def histogram(self, buckets=None, even=False): """ Compute a histogram of the data. @@ -897,37 +897,33 @@ def histogram(self, bucketCount, buckets=None, evenBuckets=False): of the last position and all NaN entries will be counted in that bucket. - If buckets is not given, it will be generated by using bucketCount - number of buckets evenly spaced between the minimum and maximum - of the RDD. For example if the min value is 0 and the max is 100 + If buckets is an number, it will generates buckets which is + evenly spaced between the minimum and maximum of the RDD. For + example if the min value is 0 and the max is 100 and there are two buckets the resulting buckets will be [0,50) [50,100]. bucketCount must be at least 1 If the RDD contains infinity, NaN throws an exception If the elements in RDD do not vary (max == min) always returns a single bucket. - >>> rdd = sc.parallelize(range(50)) - >>> rdd.histogram(5) - [10L, 10L, 10L, 10L, 10L] + >>> rdd = sc.parallelize(range(51)) + >>> rdd.histogram(2) + ([0L, 25L, 50L], [25L, 26L] >>> rdd.histogram(3, [0, 5, 25, 50]) [5L, 20L, 25L] >>> rdd.histogram(4, [0, 15, 30, 45, 60], True) - [15L, 15L, 15L, 5L] + [15L, 15L, 15L, 6L] """ drdd = self.map(lambda x:float(x)) batched = isinstance(drdd._jrdd_deserializer, BatchedSerializer) jdrdd = self.ctx._jvm.PythonRDD.pythonToJavaDouble(drdd._jrdd, batched) - if buckets is None: - if bucketCount < 1: - raise ValueError("bucketCount should be greater than 1") + if isinstance(buckets, (int,long)): + if buckets < 1: + raise ValueError("buckets should be greater than 1") - r = jdrdd.histogram(bucketCount) - return list(r._2()) - - elif len(buckets) != bucketCount + 1: - raise ValueError("bucketCount does not match with " - "length of buckets") + r = jdrdd.histogram(buckets) + return list(r._r1()), list(r._2()) jbuckets = self.ctx._gateway.new_array(self.ctx._gateway.jvm.java.lang.Double, len(buckets)) for i in range(len(buckets)): From a95eca01ebfd023a5b016015b49d98abbd658287 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 15:30:10 -0700 Subject: [PATCH 04/19] add zipWithIndex and zipWithUniqueId --- python/pyspark/context.py | 9 +++++ python/pyspark/rdd.py | 80 +++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a2fe7f14a20b2..e03151e5391f6 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -781,6 +781,15 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) + def runApproximateJob(self, rdd, func, evaluator, timeout): + """ + :: DeveloperApi :: + Run a job that can return approximate results. + + Not implemented. + """ + raise NotImplementedError + def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d95a68c7d01b1..34c8fb50a4d21 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -907,10 +907,10 @@ def histogram(self, buckets=None, even=False): >>> rdd = sc.parallelize(range(51)) >>> rdd.histogram(2) - ([0L, 25L, 50L], [25L, 26L] - >>> rdd.histogram(3, [0, 5, 25, 50]) - [5L, 20L, 25L] - >>> rdd.histogram(4, [0, 15, 30, 45, 60], True) + ([0.0, 25.0, 50.0], [25L, 26L]) + >>> rdd.histogram([0, 5, 25, 50]) + [5L, 20L, 26L] + >>> rdd.histogram([0, 15, 30, 45, 60], True) [15L, 15L, 15L, 6L] """ @@ -923,12 +923,12 @@ def histogram(self, buckets=None, even=False): raise ValueError("buckets should be greater than 1") r = jdrdd.histogram(buckets) - return list(r._r1()), list(r._2()) + return list(r._1()), list(r._2()) jbuckets = self.ctx._gateway.new_array(self.ctx._gateway.jvm.java.lang.Double, len(buckets)) for i in range(len(buckets)): jbuckets[i] = float(buckets[i]) - return list(jdrdd.histogram(jbuckets, evenBuckets)) + return list(jdrdd.histogram(jbuckets, even)) def mean(self): """ @@ -1750,29 +1750,56 @@ def zip(self, other): >>> x.zip(y).collect() [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] """ + if self.getNumPartitions() != other.getNumPartitions(): + raise ValueError("the number of partitions dose not match" + " with each other") + pairRDD = self._jrdd.zip(other._jrdd) deserializer = PairDeserializer(self._jrdd_deserializer, other._jrdd_deserializer) return RDD(pairRDD, self.ctx, deserializer) - def zipPartitions(self, other, f): + def zipPartitions(self, other, f, preservesPartitioning=False): """ Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. + + Not implemented. """ raise NotImplementedError def zipWithIndex(self): """ Zips this RDD with its element indices. + + >>> sc.parallelize(range(4), 2).zipWithIndex().collect() + [(0, 0), (1, 1), (2, 2), (3, 3)] """ - raise NotImplementedError + nums = self.glom().map(lambda it: sum(1 for i in it)).collect() + starts = [0] + for i in range(len(nums) - 1): + starts.append(starts[-1] + nums[i]) + + def func(k, it): + for i, v in enumerate(it): + yield starts[k] + i, v + + return self.mapPartitionsWithIndex(func) def zipWithUniqueId(self): """ Zips this RDD with generated unique Long ids. + + >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect() + [(0, 0), (2, 1), (1, 2), (3, 3)] """ - raise NotImplementedError + n = self.getNumPartitions() + + def func(k, it): + for i, v in enumerate(it): + yield i * n + k, v + + return self.mapPartitionsWithIndex(func) def name(self): """ @@ -1842,63 +1869,78 @@ def _defaultReducePartitions(self): def lookup(self, key): """ Return the list of values in the RDD for key key. + + Not Implemented """ raise NotImplementedError - def countApprox(self, timeout, confidence=1.0): + def countApprox(self, timeout, confidence=0.95): """ :: Experimental :: Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished. + + Not implemented. """ raise NotImplementedError - def countApproxDistinct(self, timeout, confidence=1.0): + def countApproxDistinct(self, timeout, confidence=0.95): """ + :: Experimental :: Return approximate number of distinct elements in the RDD. + + Not implemented. """ raise NotImplementedError - def countByValueApprox(self, timeout, confidence=1.0): + def countByValueApprox(self, timeout, confidence=0.95): """ :: Experimental:: Approximate version of countByValue(). + + Not implemented. """ raise NotImplementedError - def sumApprox(self, timeout, confidence=1.0): + def sumApprox(self, timeout, confidence=0.95): """ :: Experimental :: Approximate operation to return the sum within a timeout or meet the confidence. + + Not implemented. """ raise NotImplementedError - def meanApprox(self, timeout, confidence=1.0): + def meanApprox(self, timeout, confidence=0.95): """ :: Experimental :: Approximate operation to return the mean within a timeout or meet the confidence. + + Not implemented. """ raise NotImplementedError - def countApproxDistinctByKey(self): + def countApproxDistinctByKey(self, timeout, confidence=0.95): """ + :: Experimental :: Return approximate number of distinct values for each key in this RDD. + + Not implemented. """ raise NotImplementedError - def countByKeyApprox(self, timeout, confidence=1.0): + def countByKeyApprox(self, timeout, confidence=0.95): """ :: Experimental :: Approximate version of countByKey that can return a partial result if it does not finish within a timeout. + + Not implemented. """ raise NotImplementedError - - - class PipelinedRDD(RDD): """ From 4ffae0031e1f00641845fc5e9e3b62f54e7c56ad Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 16:50:57 -0700 Subject: [PATCH 05/19] collectPartitions() --- python/pyspark/rdd.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 34c8fb50a4d21..8cbc6b7b420ae 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -737,10 +737,16 @@ def _collect_iterator_through_file(self, iterator): def collectPartitions(self, partitions): """ - Return an array that contains all of the elements in a specific + Return a list of list that contains all of the elements in a specific partition of this RDD. + + >>> rdd = sc.parallelize(range(8), 4) + >>> rdd.collectPartitions([1, 3]) + [[2, 3], [6, 7]] """ - raise NotImplementedError + + return [self.ctx.runJob(self, lambda it: it, [p], True) + for p in partitions] def reduce(self, f): """ From 7a9ea0a4fbb804101d6180bd95a5964822f7a6a0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 16:56:30 -0700 Subject: [PATCH 06/19] update docs of histogram --- python/pyspark/rdd.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8cbc6b7b420ae..ee5e32e892c93 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -903,13 +903,14 @@ def histogram(self, buckets=None, even=False): of the last position and all NaN entries will be counted in that bucket. - If buckets is an number, it will generates buckets which is + If buckets is a number, it will generates buckets which is evenly spaced between the minimum and maximum of the RDD. For - example if the min value is 0 and the max is 100 - and there are two buckets the resulting buckets will be [0,50) - [50,100]. bucketCount must be at least 1 If the RDD contains - infinity, NaN throws an exception If the elements in RDD do not - vary (max == min) always returns a single bucket. + example, if the min value is 0 and the max is 100, given buckets + as 2, the resulting buckets will be [0,50) [50,100]. buckets must + be at least 1 If the RDD contains infinity, NaN throws an exception + If the elements in RDD do not vary (max == min) always returns + a single bucket. It will return an tuple of buckets and histogram + in them. >>> rdd = sc.parallelize(range(51)) >>> rdd.histogram(2) From 53640be1de45418dbae9406f9717873c62bfa584 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 22:38:57 -0700 Subject: [PATCH 07/19] histogram() in pure Python, better support for int --- .../apache/spark/api/python/PythonRDD.scala | 18 --- python/pyspark/rdd.py | 118 ++++++++++++------ 2 files changed, 78 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index adaa33b5d7dd7..756f7ef823292 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -729,24 +729,6 @@ private[spark] object PythonRDD extends Logging { }.toJavaRDD() } - /** - * Convert a RDD of serialized Python objects to RDD of Double, that is usable by - * PySpark. - */ - def pythonToJavaDouble(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaDoubleRDD = { - new JavaDoubleRDD(pyRDD.rdd.mapPartitions { iter => - val unpickle = new Unpickler - iter.flatMap { row => - val obj = unpickle.loads(row) - if (batched) { - obj.asInstanceOf[JArrayList[_]].map(_.asInstanceOf[Double]) - } else { - Seq(obj.asInstanceOf[Double]) - } - } - }) - } - /** * Convert a RDD of Java objects to and RDD of serialized Python objects, that is usable by * PySpark. diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ee5e32e892c93..110ef5a4ec293 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -17,22 +17,22 @@ from base64 import standard_b64encode as b64enc import copy -from collections import defaultdict -from collections import namedtuple -from itertools import chain, ifilter, imap import operator import os import sys import shlex import traceback -from subprocess import Popen, PIPE -from tempfile import NamedTemporaryFile -from threading import Thread import warnings import heapq +import array +from collections import defaultdict, namedtuple +from itertools import chain, ifilter, imap from random import Random from math import sqrt, log -import array +from bisect import bisect_right +from subprocess import Popen, PIPE +from tempfile import NamedTemporaryFile +from threading import Thread from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ @@ -884,58 +884,96 @@ def redFunc(left_counter, right_counter): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) - def histogram(self, buckets=None, even=False): + def histogram(self, buckets, even=False): """ - Compute a histogram of the data. - Compute a histogram using the provided buckets. The buckets - are all open to the left except for the last which is closed - e.g. for the array [1,10,20,50] the buckets are [1,10) [10,20) - [20,50] e.g 1<=x<10, 10<=x<20, 20<=x<50. And on the input of 1 - and 50 we would have a histogram of 1,0,0. + are all open to the right except for the last which is closed. + e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], + which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 + and 50 we would have a histogram of 1,0,1. - Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) + If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), this can be switched from an O(log n) inseration to O(1) per - element(where n = # buckets), if you set evenBuckets to true. - Buckets must be sorted and not contain any duplicates, Buckets - array must be at least two elements All NaN entries are treated - the same. If you have a NaN bucket it must be the maximum value - of the last position and all NaN entries will be counted in that - bucket. - - If buckets is a number, it will generates buckets which is + element(where n = # buckets), if you set `even` to True. + + Buckets must be sorted and not contain any duplicates, must be + at least two elements. + + If `buckets` is a number, it will generates buckets which is evenly spaced between the minimum and maximum of the RDD. For example, if the min value is 0 and the max is 100, given buckets as 2, the resulting buckets will be [0,50) [50,100]. buckets must be at least 1 If the RDD contains infinity, NaN throws an exception If the elements in RDD do not vary (max == min) always returns - a single bucket. It will return an tuple of buckets and histogram - in them. + a single bucket. + + It will return an tuple of buckets and histogram. >>> rdd = sc.parallelize(range(51)) >>> rdd.histogram(2) - ([0.0, 25.0, 50.0], [25L, 26L]) + ([0, 25, 50], [25, 26]) >>> rdd.histogram([0, 5, 25, 50]) - [5L, 20L, 26L] + ([0, 5, 25, 50], [5, 20, 26]) >>> rdd.histogram([0, 15, 30, 45, 60], True) - [15L, 15L, 15L, 6L] + ([0, 15, 30, 45, 60], [15, 15, 15, 6]) """ - drdd = self.map(lambda x:float(x)) - batched = isinstance(drdd._jrdd_deserializer, BatchedSerializer) - jdrdd = self.ctx._jvm.PythonRDD.pythonToJavaDouble(drdd._jrdd, batched) - - if isinstance(buckets, (int,long)): + if isinstance(buckets, (int, long)): if buckets < 1: - raise ValueError("buckets should be greater than 1") + raise ValueError("buckets should not less than 1") + + # faster than stats() + def minmax(it): + minv, maxv = float("inf"), float("-inf") + for v in it: + minv = min(minv, v) + maxv = max(maxv, v) + return [(minv, maxv)] - r = jdrdd.histogram(buckets) - return list(r._1()), list(r._2()) + def _merge(a, b): + return (min(a[0], b[0]), max(a[1], b[1])) - jbuckets = self.ctx._gateway.new_array(self.ctx._gateway.jvm.java.lang.Double, len(buckets)) - for i in range(len(buckets)): - jbuckets[i] = float(buckets[i]) - return list(jdrdd.histogram(jbuckets, even)) + minv, maxv = self.mapPartitions(minmax).reduce(_merge) + + if minv == maxv or buckets == 1: + return [minv, maxv], [self.count()] + + inc = (maxv - minv) / buckets + # keep them as integer if possible + if inc * buckets != maxv - minv: + inc = (maxv - minv) * 1.0 / buckets + + buckets = [i * inc + minv for i in range(buckets)] + buckets.append(maxv) # fix accumuated error + even = True + + else: + if len(buckets) < 2: + raise ValueError("buckets should have more than one value") + if sorted(buckets) != buckets: + raise ValueError("buckets should be sorted") + + minv = buckets[0] + maxv = buckets[-1] + inc = buckets[1] - buckets[0] if even else None + + def histogram(iterator): + counters = [0] * len(buckets) + for i in iterator: + if i > maxv or i < minv: + continue + t = (int((i - minv) / inc) if even + else bisect_right(buckets, i) - 1) + counters[t] += 1 + # add last two together + last = counters.pop() + counters[-1] += last + return [counters] + + def mergeCounters(a, b): + return [i + j for i, j in zip(a, b)] + + return buckets, self.mapPartitions(histogram).reduce(mergeCounters) def mean(self): """ From 9a01ac30a0b03bf265db4bda5fdd3e2a49bddf8d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Aug 2014 22:42:33 -0700 Subject: [PATCH 08/19] fix docs --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 756f7ef823292..54f1c3383e453 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} import org.apache.spark._ import org.apache.spark.SparkContext._ -import org.apache.spark.api.java.{JavaDoubleRDD, JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -730,7 +730,7 @@ private[spark] object PythonRDD extends Logging { } /** - * Convert a RDD of Java objects to and RDD of serialized Python objects, that is usable by + * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by * PySpark. */ def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { From 7ba5f882ab124ffb77414024f6c9d3297c2351c9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Aug 2014 10:58:24 -0700 Subject: [PATCH 09/19] refactor --- python/pyspark/rdd.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 110ef5a4ec293..d934d90fdf59a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -923,17 +923,9 @@ def histogram(self, buckets, even=False): raise ValueError("buckets should not less than 1") # faster than stats() - def minmax(it): - minv, maxv = float("inf"), float("-inf") - for v in it: - minv = min(minv, v) - maxv = max(maxv, v) - return [(minv, maxv)] - - def _merge(a, b): - return (min(a[0], b[0]), max(a[1], b[1])) - - minv, maxv = self.mapPartitions(minmax).reduce(_merge) + def minmax(a, b): + return min(a[0], b[0]), max(a[1], b[1]) + minv, maxv = self.map(lambda x:(x,x)).reduce(minmax) if minv == maxv or buckets == 1: return [minv, maxv], [self.count()] From a25c34e49bf2958b210fc0c0c793f51d00b726fa Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Aug 2014 14:33:46 -0700 Subject: [PATCH 10/19] fix bug of countApproxDistinct --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e1c49e35abecd..0159003c88e06 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1004,7 +1004,7 @@ abstract class RDD[T: ClassTag]( }, (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { h1.addAll(h2) - h2 + h1 }).cardinality() } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index b31e3a09e5b9c..4a7dc8dca25e2 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -81,11 +81,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble - val size = 100 - val uniformDistro = for (i <- 1 to 100000) yield i % size - val simpleRdd = sc.makeRDD(uniformDistro) - assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4) - assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1) + val size = 1000 + val uniformDistro = for (i <- 1 to 5000) yield i % size + val simpleRdd = sc.makeRDD(uniformDistro, 10) + assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2) + assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1) } test("SparkContext.union") { From 1218b3b8c997e81cecc23a57f83cd79b5eac9147 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Aug 2014 14:34:18 -0700 Subject: [PATCH 11/19] add countApprox and countApproxDistinct meanApprox() and sumApprox() --- .../apache/spark/api/python/PythonRDD.scala | 19 +++- python/pyspark/rdd.py | 103 ++++++++++++++---- 2 files changed, 99 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 54f1c3383e453..f15c0bc2beb84 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} import org.apache.spark._ import org.apache.spark.SparkContext._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaDoubleRDD, JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -741,6 +741,23 @@ private[spark] object PythonRDD extends Logging { } } } + + /** + * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark. + */ + def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj.asInstanceOf[JArrayList[_]] + } else { + Seq(obj) + } + } + }.toJavaRDD() + } } private diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d934d90fdf59a..41aa4748d389e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -216,6 +216,22 @@ def _parse_memory(s): return int(float(s[:-1]) * units[s[-1].lower()]) +class BoundedFloat(float): + """ + Bounded value is generated by approximate job, with confidence and low + bound and high bound. + + >>> BoundedFloat(100.0, 0.95, 95.0, 105.0) + 100.0 + """ + def __new__(cls, mean, confidence, low, high): + obj = float.__new__(cls, mean) + obj.confidence = confidence + obj.low = low + obj.high = high + return obj + + class RDD(object): """ @@ -1911,33 +1927,63 @@ def lookup(self, key): """ raise NotImplementedError - def countApprox(self, timeout, confidence=0.95): + def _is_pickled(self): + """ Return this RDD is serialized by Pickle or not. """ + der = self._jrdd_deserializer + if isinstance(der, PickleSerializer): + return True + if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer): + return True + return False + + def _to_jrdd(self): + """ Return an JavaRDD """ + if not self._is_pickled(): + self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024)) + batched = isinstance(self._jrdd_deserializer, BatchedSerializer) + return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched) + + def countApproxDistinct(self, relativeSD=0.05): """ :: Experimental :: - Approximate version of count() that returns a potentially incomplete - result within a timeout, even if not all tasks have finished. + Return approximate number of distinct elements in the RDD. - Not implemented. + The algorithm used is based on streamlib's implementation of + "HyperLogLog in Practice: Algorithmic Engineering of a State + of The Art Cardinality Estimation Algorithm", available + here. + + :param: relativeSD Relative accuracy. Smaller values create + counters that require more space. + It must be greater than 0.000017. + + >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() + >>> 950 < n < 1050 + True """ - raise NotImplementedError + return self._to_jrdd().countApproxDistinct(relativeSD) - def countApproxDistinct(self, timeout, confidence=0.95): + def countApproxDistinctByKey(self, timeout, confidence=0.95): """ :: Experimental :: - Return approximate number of distinct elements in the RDD. + Return approximate number of distinct values for each key in this RDD. Not implemented. """ raise NotImplementedError - def countByValueApprox(self, timeout, confidence=0.95): + def countApprox(self, timeout, confidence=0.95): """ - :: Experimental:: - Approximate version of countByValue(). + :: Experimental :: + Approximate version of count() that returns a potentially incomplete + result within a timeout, even if not all tasks have finished. - Not implemented. + >>> rdd = sc.parallelize(range(1000), 10) + >>> rdd.countApprox(1000, 1.0) + 1000 """ - raise NotImplementedError + drdd = self.mapPartitions(lambda it:[float(sum(1 for i in it))]) + return int(drdd.sumApprox(timeout, confidence)) def sumApprox(self, timeout, confidence=0.95): """ @@ -1945,9 +1991,15 @@ def sumApprox(self, timeout, confidence=0.95): Approximate operation to return the sum within a timeout or meet the confidence. - Not implemented. + >>> rdd = sc.parallelize(range(1000), 10) + >>> r = sum(xrange(1000)) + >>> (rdd.sumApprox(1000) - r) / r < 0.05 + True """ - raise NotImplementedError + jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd() + jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) + r = jdrdd.sumApprox(timeout, confidence).getFinalValue() + return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) def meanApprox(self, timeout, confidence=0.95): """ @@ -1955,27 +2007,34 @@ def meanApprox(self, timeout, confidence=0.95): Approximate operation to return the mean within a timeout or meet the confidence. - Not implemented. + >>> rdd = sc.parallelize(range(1000), 10) + >>> r = sum(xrange(1000)) / 1000.0 + >>> (rdd.meanApprox(1000) - r) / r < 0.05 + True """ - raise NotImplementedError + jrdd = self.map(float)._to_jrdd() + jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) + r = jdrdd.meanApprox(timeout, confidence).getFinalValue() + return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) - def countApproxDistinctByKey(self, timeout, confidence=0.95): + def countByKeyApprox(self, timeout, confidence=0.95): """ :: Experimental :: - Return approximate number of distinct values for each key in this RDD. + Approximate version of countByKey that can return a partial result + if it does not finish within a timeout. Not implemented. """ raise NotImplementedError - def countByKeyApprox(self, timeout, confidence=0.95): + def countByValueApprox(self, timeout, confidence=0.95): """ - :: Experimental :: - Approximate version of countByKey that can return a partial result if it does not finish within a timeout. + :: Experimental:: + Approximate version of countByValue(). Not implemented. """ - raise NotImplementedError + return self.map(lambda x: (x, None)).countByKeyApprox(timeout, confidence) class PipelinedRDD(RDD): From 91324562d5faa268051ead6a7c6de5fe8be8fbef Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Aug 2014 14:49:12 -0700 Subject: [PATCH 12/19] fix pep8 --- python/pyspark/rdd.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9b3b8f9bf77f5..18967da351836 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -944,7 +944,7 @@ def histogram(self, buckets, even=False): # faster than stats() def minmax(a, b): return min(a[0], b[0]), max(a[1], b[1]) - minv, maxv = self.map(lambda x:(x,x)).reduce(minmax) + minv, maxv = self.map(lambda x: (x, x)).reduce(minmax) if minv == maxv or buckets == 1: return [minv, maxv], [self.count()] @@ -955,7 +955,7 @@ def minmax(a, b): inc = (maxv - minv) * 1.0 / buckets buckets = [i * inc + minv for i in range(buckets)] - buckets.append(maxv) # fix accumuated error + buckets.append(maxv) # fix accumuated error even = True else: @@ -973,8 +973,7 @@ def histogram(iterator): for i in iterator: if i > maxv or i < minv: continue - t = (int((i - minv) / inc) if even - else bisect_right(buckets, i) - 1) + t = (int((i - minv) / inc) if even else bisect_right(buckets, i) - 1) counters[t] += 1 # add last two together last = counters.pop() @@ -1004,7 +1003,6 @@ def variance(self): """ return self.stats().variance() - def stdev(self): """ Compute the standard deviation of this RDD's elements. @@ -1816,7 +1814,7 @@ def zip(self, other): """ if self.getNumPartitions() != other.getNumPartitions(): raise ValueError("the number of partitions dose not match" - " with each other") + " with each other") pairRDD = self._jrdd.zip(other._jrdd) deserializer = PairDeserializer(self._jrdd_deserializer, @@ -1994,7 +1992,7 @@ def countApprox(self, timeout, confidence=0.95): >>> rdd.countApprox(1000, 1.0) 1000 """ - drdd = self.mapPartitions(lambda it:[float(sum(1 for i in it))]) + drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))]) return int(drdd.sumApprox(timeout, confidence)) def sumApprox(self, timeout, confidence=0.95): From 977e4741ceadf7716676aae120dd846c8c415376 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 7 Aug 2014 12:01:29 -0700 Subject: [PATCH 13/19] =?UTF-8?q?address=20comments=EF=BC=9A=20improve=20d?= =?UTF-8?q?ocs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/pyspark/context.py | 3 +++ python/pyspark/rdd.py | 26 ++++++++++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c6faa213860bb..f6f7f573aec5c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -269,6 +269,9 @@ def isLocal(self): @property def conf(self): + """ + The L{SparkConf} object + """ return self._conf def stop(self): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 18967da351836..3af5dfea97c0c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -862,11 +862,11 @@ def min(self, comp=None): """ Find the minimum item in this RDD. - >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) + >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) >>> rdd.min() - 1.0 + 2.0 >>> rdd.min(lambda a, b: cmp(str(a), str(b))) - 1.0 + 10.0 """ if comp is not None: func = lambda a, b: a if comp(a, b) <= 0 else b @@ -1834,13 +1834,22 @@ def zipWithIndex(self): """ Zips this RDD with its element indices. + The ordering is first based on the partition index and then the + ordering of items within each partition. So the first item in + the first partition gets index 0, and the last item in the last + partition receives the largest index. + + This method needs to trigger a spark job when this RDD contains + more than one partitions. + >>> sc.parallelize(range(4), 2).zipWithIndex().collect() [(0, 0), (1, 1), (2, 2), (3, 3)] """ - nums = self.glom().map(lambda it: sum(1 for i in it)).collect() starts = [0] - for i in range(len(nums) - 1): - starts.append(starts[-1] + nums[i]) + if self.getNumPartitions() > 1: + nums = self.glom().map(lambda it: sum(1 for i in it)).collect() + for i in range(len(nums) - 1): + starts.append(starts[-1] + nums[i]) def func(k, it): for i, v in enumerate(it): @@ -1852,6 +1861,11 @@ def zipWithUniqueId(self): """ Zips this RDD with generated unique Long ids. + Items in the kth partition will get ids k, n+k, 2*n+k, ..., where + n is the number of partitions. So there may exist gaps, but this + method won't trigger a spark job, which is different from + L{zipWithIndex} + >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect() [(0, 0), (2, 1), (1, 2), (3, 3)] """ From ac606ca1ed3e81329d6808f8a99557079e0f463f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 11 Aug 2014 11:48:48 -0700 Subject: [PATCH 14/19] comment out not implemented APIs add TODO for them --- python/pyspark/rdd.py | 72 +++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3af5dfea97c0c..ab69eeb2f7b3c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1821,14 +1821,12 @@ def zip(self, other): other._jrdd_deserializer) return RDD(pairRDD, self.ctx, deserializer) - def zipPartitions(self, other, f, preservesPartitioning=False): - """ - Zip this RDD's partitions with one (or more) RDD(s) and return a - new RDD by applying a function to the zipped partitions. - - Not implemented. - """ - raise NotImplementedError + # TODO + # def zipPartitions(self, other, f, preservesPartitioning=False): + # """ + # Zip this RDD's partitions with one (or more) RDD(s) and return a + # new RDD by applying a function to the zipped partitions. + # """ def zipWithIndex(self): """ @@ -1943,13 +1941,10 @@ def _defaultReducePartitions(self): # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those # hashes aren't retained. - def lookup(self, key): - """ - Return the list of values in the RDD for key key. - - Not Implemented - """ - raise NotImplementedError + # def lookup(self, key): + # """ + # Return the list of values in the RDD for key key. + # """ def _is_pickled(self): """ Return this RDD is serialized by Pickle or not. """ @@ -1987,14 +1982,12 @@ def countApproxDistinct(self, relativeSD=0.05): """ return self._to_jrdd().countApproxDistinct(relativeSD) - def countApproxDistinctByKey(self, timeout, confidence=0.95): - """ - :: Experimental :: - Return approximate number of distinct values for each key in this RDD. - - Not implemented. - """ - raise NotImplementedError + # TODO + # def countApproxDistinctByKey(self, timeout, confidence=0.95): + # """ + # :: Experimental :: + # Return approximate number of distinct values for each key in this RDD. + # """ def countApprox(self, timeout, confidence=0.95): """ @@ -2041,24 +2034,21 @@ def meanApprox(self, timeout, confidence=0.95): r = jdrdd.meanApprox(timeout, confidence).getFinalValue() return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) - def countByKeyApprox(self, timeout, confidence=0.95): - """ - :: Experimental :: - Approximate version of countByKey that can return a partial result - if it does not finish within a timeout. - - Not implemented. - """ - raise NotImplementedError - - def countByValueApprox(self, timeout, confidence=0.95): - """ - :: Experimental:: - Approximate version of countByValue(). - - Not implemented. - """ - return self.map(lambda x: (x, None)).countByKeyApprox(timeout, confidence) + # TODO + # def countByKeyApprox(self, timeout, confidence=0.95): + # """ + # :: Experimental :: + # Approximate version of countByKey that can return a partial result + # if it does not finish within a timeout. + # """ + # + # def countByValueApprox(self, timeout, confidence=0.95): + # """ + # :: Experimental:: + # Approximate version of countByValue(). + # + # """ + # return self.map(lambda x: (x, None)).countByKeyApprox(timeout, confidence) class PipelinedRDD(RDD): From f0158e4bb3db74e9c96035a2ebd6192ad256fc3e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 11 Aug 2014 11:50:25 -0700 Subject: [PATCH 15/19] comment out not implemented API in SparkContext --- python/pyspark/context.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f6f7f573aec5c..083bc272e56be 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -787,14 +787,12 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) - def runApproximateJob(self, rdd, func, evaluator, timeout): - """ - :: DeveloperApi :: - Run a job that can return approximate results. - - Not implemented. - """ - raise NotImplementedError + # TODO + # def runApproximateJob(self, rdd, func, evaluator, timeout): + # """ + # :: DeveloperApi :: + # Run a job that can return approximate results. + # """ def _test(): From cb4f7123924065b19145b29ae5de9ef0b756886d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 13 Aug 2014 15:30:08 -0700 Subject: [PATCH 16/19] Mark SparkConf as read-only after initialization --- python/pyspark/conf.py | 21 +++++++++++++++++++++ python/pyspark/context.py | 3 +++ python/pyspark/rdd.py | 2 +- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index fb716f6753a45..bd4497ef4a46b 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -54,6 +54,17 @@ (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] """ +import functools + + +def check_readonly(f): + @functools.wraps(f) + def func(self, *a, **kw): + if self._readonly: + raise Exception("Configuration can not be changed after initialization") + return f(self, *a, **kw) + return func + class SparkConf(object): @@ -96,32 +107,41 @@ def __init__(self, loadDefaults=True, _jvm=None, _jconf=None): _jvm = _jvm or SparkContext._jvm self._jconf = _jvm.SparkConf(loadDefaults) + # Configuration can not be changed after initialization + self._readonly = False + + @check_readonly def set(self, key, value): """Set a configuration property.""" self._jconf.set(key, unicode(value)) return self + @check_readonly def setIfMissing(self, key, value): """Set a configuration property, if not already set.""" if self.get(key) is None: self.set(key, value) return self + @check_readonly def setMaster(self, value): """Set master URL to connect to.""" self._jconf.setMaster(value) return self + @check_readonly def setAppName(self, value): """Set application name.""" self._jconf.setAppName(value) return self + @check_readonly def setSparkHome(self, value): """Set path where Spark is installed on worker nodes.""" self._jconf.setSparkHome(value) return self + @check_readonly def setExecutorEnv(self, key=None, value=None, pairs=None): """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): @@ -133,6 +153,7 @@ def setExecutorEnv(self, key=None, value=None, pairs=None): self._jconf.setExecutorEnv(k, v) return self + @check_readonly def setAll(self, pairs): """ Set multiple parameters, passed as a list of key-value pairs. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 083bc272e56be..f6e08af8642a4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -153,6 +153,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, # Create the Java SparkContext through Py4J self._jsc = self._initialize_context(self._conf._jconf) + self._conf._readonly = True # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server @@ -271,6 +272,8 @@ def isLocal(self): def conf(self): """ The L{SparkConf} object + + Configuration can not be changed after initialization. """ return self._conf diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ab69eeb2f7b3c..79289fb58228c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1956,7 +1956,7 @@ def _is_pickled(self): return False def _to_jrdd(self): - """ Return an JavaRDD """ + """ Return an JavaRDD of Object by unpickling""" if not self._is_pickled(): self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024)) batched = isinstance(self._jrdd_deserializer, BatchedSerializer) From 63c013da7fc73361e07a2fdd248ef5a50092f090 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 14 Aug 2014 16:00:08 -0700 Subject: [PATCH 17/19] address all the comments: 1. implement lookup(), similar to that in Scala 2. handle None, nan, inf in histogram, add many tests 3. remove collectPartitions() 4. improve docs --- python/pyspark/rdd.py | 88 +++++++++++++++++++++++------------- python/pyspark/tests.py | 99 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 30 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 412d746f98149..27fe0aa5555e0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -26,6 +26,7 @@ import heapq import array import bisect +import math from collections import defaultdict, namedtuple from itertools import chain, ifilter, imap from random import Random @@ -755,19 +756,6 @@ def _collect_iterator_through_file(self, iterator): yield item os.unlink(tempFile.name) - def collectPartitions(self, partitions): - """ - Return a list of list that contains all of the elements in a specific - partition of this RDD. - - >>> rdd = sc.parallelize(range(8), 4) - >>> rdd.collectPartitions([1, 3]) - [[2, 3], [6, 7]] - """ - - return [self.ctx.runJob(self, lambda it: it, [p], True) - for p in partitions] - def reduce(self, f): """ Reduces the elements of this RDD using the specified commutative and @@ -846,6 +834,9 @@ def max(self, comp=None): """ Find the maximum item in this RDD. + @param comp: A function used to compare two elements, the builtin `cmp` + will be used by default. + >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) >>> rdd.max() 43.0 @@ -863,6 +854,9 @@ def min(self, comp=None): """ Find the minimum item in this RDD. + @param comp: A function used to compare two elements, the builtin `cmp` + will be used by default. + >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) >>> rdd.min() 2.0 @@ -904,7 +898,7 @@ def redFunc(left_counter, right_counter): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) - def histogram(self, buckets, even=False): + def histogram(self, buckets, evenBuckets=False): """ Compute a histogram using the provided buckets. The buckets are all open to the right except for the last which is closed. @@ -942,39 +936,54 @@ def histogram(self, buckets, even=False): if buckets < 1: raise ValueError("buckets should not less than 1") + # filter out non-comparable elements + self = self.filter(lambda x: x is not None and not math.isnan(x)) + # faster than stats() def minmax(a, b): return min(a[0], b[0]), max(a[1], b[1]) - minv, maxv = self.map(lambda x: (x, x)).reduce(minmax) + try: + minv, maxv = self.map(lambda x: (x, x)).reduce(minmax) + except TypeError as e: + if e.message == "reduce() of empty sequence with no initial value": + raise ValueError("can not generate buckets from empty RDD") + raise if minv == maxv or buckets == 1: return [minv, maxv], [self.count()] inc = (maxv - minv) / buckets + if math.isinf(inc): + raise ValueError("Can not generate buckets with infinite value") + # keep them as integer if possible if inc * buckets != maxv - minv: inc = (maxv - minv) * 1.0 / buckets buckets = [i * inc + minv for i in range(buckets)] - buckets.append(maxv) # fix accumuated error - even = True + buckets.append(maxv) # fix accumulated error + evenBuckets = True else: if len(buckets) < 2: raise ValueError("buckets should have more than one value") + + if any(i is None or math.isnan(i) for i in buckets): + raise ValueError("can not have None or NaN in buckets") + if sorted(buckets) != buckets: raise ValueError("buckets should be sorted") minv = buckets[0] maxv = buckets[-1] - inc = buckets[1] - buckets[0] if even else None + inc = buckets[1] - buckets[0] if evenBuckets else None def histogram(iterator): counters = [0] * len(buckets) for i in iterator: - if i > maxv or i < minv: + if i is None or math.isnan(i) or i > maxv or i < minv: continue - t = (int((i - minv) / inc) if even else bisect_right(buckets, i) - 1) + t = (int((i - minv) / inc) if evenBuckets else bisect_right(buckets, i) - 1) counters[t] += 1 # add last two together last = counters.pop() @@ -1846,7 +1855,7 @@ def zipWithIndex(self): """ starts = [0] if self.getNumPartitions() > 1: - nums = self.glom().map(lambda it: sum(1 for i in it)).collect() + nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() for i in range(len(nums) - 1): starts.append(starts[-1] + nums[i]) @@ -1938,14 +1947,26 @@ def _defaultReducePartitions(self): else: return self.getNumPartitions() - # TODO: `lookup` is disabled because we can't make direct comparisons based - # on the key; we need to compare the hash of the key to the hash of the - # keys in the pairs. This could be an expensive operation, since those - # hashes aren't retained. - # def lookup(self, key): - # """ - # Return the list of values in the RDD for key key. - # """ + def lookup(self, key): + """ + Return the list of values in the RDD for key `key`. This operation + is done efficiently if the RDD has a known partitioner by only + searching the partition that the key maps to. + + >>> l = range(1000) + >>> rdd = sc.parallelize(zip(l, l), 10) + >>> rdd.lookup(42) # slow + [42] + >>> sorted = rdd.sortByKey() + >>> sorted.lookup(42) # fast + [42] + """ + values = self.filter(lambda (k, v): k == key).values() + + if hasattr(self, "_partitionFunc"): + return self.ctx.runJob(self, lambda x: x, [self._partitionFunc(key)], False) + + return values.collect() def _is_pickled(self): """ Return this RDD is serialized by Pickle or not. """ @@ -1957,7 +1978,11 @@ def _is_pickled(self): return False def _to_jrdd(self): - """ Return an JavaRDD of Object by unpickling""" + """ Return an JavaRDD of Object by unpickling + + It will convert each Python object into Java object by Pyrolite, whenever the + RDD is serialized in batch or not. + """ if not self._is_pickled(): self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024)) batched = isinstance(self._jrdd_deserializer, BatchedSerializer) @@ -1973,6 +1998,9 @@ def countApproxDistinct(self, relativeSD=0.05): of The Art Cardinality Estimation Algorithm", available here. + This support all the types of objects, which is supported by + Pyrolite, nearly all builtin types. + :param: relativeSD Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 22b51110ed671..e7455ee168f38 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -323,6 +323,105 @@ def test_namedtuple_in_rdd(self): theDoes = self.sc.parallelize([jon, jane]) self.assertEquals([jon, jane], theDoes.collect()) + def test_histogram(self): + # empty + rdd = self.sc.parallelize([]) + self.assertEquals([0], rdd.histogram([0, 10])[1]) + self.assertEquals([0], rdd.histogram([0, 10], True)[1]) + + # out of range + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0], rdd.histogram([0, 10])[1]) + self.assertEquals([0], rdd.histogram([0, 10], True)[1]) + + # in range with one bucket + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals([4], rdd.histogram([0, 10])[1]) + self.assertEquals([4], rdd.histogram([0, 10], True)[1]) + + # in range with one bucket exact match + self.assertEquals([4], rdd.histogram([1, 4])[1]) + self.assertEquals([4], rdd.histogram([1, 4], True)[1]) + + # out of range with two buckets + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1]) + self.assertEquals([0, 0], rdd.histogram([0, 5, 10], True)[1]) + + # out of range with two uneven buckets + rdd = self.sc.parallelize([10.01, -0.01]) + self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) + + # in range with two buckets + rdd = self.sc.parallelize([1, 2, 3, 5, 6]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1]) + + # in range with two bucket and None + rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1]) + + # in range with two uneven buckets + rdd = self.sc.parallelize([1, 2, 3, 5, 6]) + self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1]) + + # mixed range with two uneven buckets + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01]) + self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1]) + + # mixed range with four uneven buckets + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1]) + self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + + # mixed range with uneven buckets and NaN + rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, + 199.0, 200.0, 200.1, None, float('nan')]) + self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) + + # out of range with infinite buckets + rdd = self.sc.parallelize([10.01, -0.01, float('nan')]) + self.assertEquals([1, 1], rdd.histogram([float('-inf'), 0, float('inf')])[1]) + + # invalid buckets + self.assertRaises(ValueError, lambda: rdd.histogram([])) + self.assertRaises(ValueError, lambda: rdd.histogram([1])) + + # without buckets + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals(([1, 4], [4]), rdd.histogram(1)) + + # without buckets single element + rdd = self.sc.parallelize([1]) + self.assertEquals(([1, 1], [1]), rdd.histogram(1)) + + # without bucket no range + rdd = self.sc.parallelize([1] * 4) + self.assertEquals(([1, 1], [4]), rdd.histogram(1)) + + # without buckets basic two + rdd = self.sc.parallelize(range(1, 5)) + self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2)) + + # without buckets with more requested than elements + rdd = self.sc.parallelize([1, 2]) + buckets = [1 + 0.2 * i for i in range(6)] + hist = [1, 0, 0, 0, 1] + self.assertEquals((buckets, hist), rdd.histogram(5)) + + # invalid RDDs + rdd = self.sc.parallelize([1, float('inf')]) + self.assertRaises(ValueError, lambda: rdd.histogram(2)) + rdd = self.sc.parallelize([float('nan')]) + self.assertRaises(ValueError, lambda: rdd.histogram(2)) + + def test_count_approx_distinct(self): + rdd = self.sc.parallelize(range(1000)) + self.assertTrue(950 < rdd.countApproxDistinct() < 1050) + self.assertTrue(950 < rdd.map(float).countApproxDistinct() < 1050) + self.assertTrue(950 < rdd.map(str).countApproxDistinct() < 1050) + self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 1050) + class TestIO(PySparkTestCase): From 1ac98d63642c625b80d41305c278044bdd940ccb Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 26 Aug 2014 11:25:01 -0700 Subject: [PATCH 18/19] remove splitted changes --- python/pyspark/rdd.py | 199 ---------------------------------------- python/pyspark/tests.py | 92 ------------------- 2 files changed, 291 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 94c806cfd9ec0..7c2a9f89b36ed 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -235,22 +235,6 @@ def _parse_memory(s): return int(float(s[:-1]) * units[s[-1].lower()]) -class BoundedFloat(float): - """ - Bounded value is generated by approximate job, with confidence and low - bound and high bound. - - >>> BoundedFloat(100.0, 0.95, 95.0, 105.0) - 100.0 - """ - def __new__(cls, mean, confidence, low, high): - obj = float.__new__(cls, mean) - obj.confidence = confidence - obj.low = low - obj.high = high - return obj - - class RDD(object): """ @@ -904,103 +888,6 @@ def redFunc(left_counter, right_counter): return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) - def histogram(self, buckets, evenBuckets=False): - """ - Compute a histogram using the provided buckets. The buckets - are all open to the right except for the last which is closed. - e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], - which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 - and 50 we would have a histogram of 1,0,1. - - If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), - this can be switched from an O(log n) inseration to O(1) per - element(where n = # buckets), if you set `even` to True. - - Buckets must be sorted and not contain any duplicates, must be - at least two elements. - - If `buckets` is a number, it will generates buckets which is - evenly spaced between the minimum and maximum of the RDD. For - example, if the min value is 0 and the max is 100, given buckets - as 2, the resulting buckets will be [0,50) [50,100]. buckets must - be at least 1 If the RDD contains infinity, NaN throws an exception - If the elements in RDD do not vary (max == min) always returns - a single bucket. - - It will return an tuple of buckets and histogram. - - >>> rdd = sc.parallelize(range(51)) - >>> rdd.histogram(2) - ([0, 25, 50], [25, 26]) - >>> rdd.histogram([0, 5, 25, 50]) - ([0, 5, 25, 50], [5, 20, 26]) - >>> rdd.histogram([0, 15, 30, 45, 60], True) - ([0, 15, 30, 45, 60], [15, 15, 15, 6]) - """ - - if isinstance(buckets, (int, long)): - if buckets < 1: - raise ValueError("buckets should not less than 1") - - # filter out non-comparable elements - self = self.filter(lambda x: x is not None and not math.isnan(x)) - - # faster than stats() - def minmax(a, b): - return min(a[0], b[0]), max(a[1], b[1]) - try: - minv, maxv = self.map(lambda x: (x, x)).reduce(minmax) - except TypeError as e: - if e.message == "reduce() of empty sequence with no initial value": - raise ValueError("can not generate buckets from empty RDD") - raise - - if minv == maxv or buckets == 1: - return [minv, maxv], [self.count()] - - inc = (maxv - minv) / buckets - if math.isinf(inc): - raise ValueError("Can not generate buckets with infinite value") - - # keep them as integer if possible - if inc * buckets != maxv - minv: - inc = (maxv - minv) * 1.0 / buckets - - buckets = [i * inc + minv for i in range(buckets)] - buckets.append(maxv) # fix accumulated error - evenBuckets = True - - else: - if len(buckets) < 2: - raise ValueError("buckets should have more than one value") - - if any(i is None or math.isnan(i) for i in buckets): - raise ValueError("can not have None or NaN in buckets") - - if sorted(buckets) != buckets: - raise ValueError("buckets should be sorted") - - minv = buckets[0] - maxv = buckets[-1] - inc = buckets[1] - buckets[0] if evenBuckets else None - - def histogram(iterator): - counters = [0] * len(buckets) - for i in iterator: - if i is None or math.isnan(i) or i > maxv or i < minv: - continue - t = (int((i - minv) / inc) if evenBuckets else bisect_right(buckets, i) - 1) - counters[t] += 1 - # add last two together - last = counters.pop() - counters[-1] += last - return [counters] - - def mergeCounters(a, b): - return [i + j for i, j in zip(a, b)] - - return buckets, self.mapPartitions(histogram).reduce(mergeCounters) - def mean(self): """ Compute the mean of this RDD's elements. @@ -1972,47 +1859,6 @@ def _defaultReducePartitions(self): else: return self.getNumPartitions() - def lookup(self, key): - """ - Return the list of values in the RDD for key `key`. This operation - is done efficiently if the RDD has a known partitioner by only - searching the partition that the key maps to. - - >>> l = range(1000) - >>> rdd = sc.parallelize(zip(l, l), 10) - >>> rdd.lookup(42) # slow - [42] - >>> sorted = rdd.sortByKey() - >>> sorted.lookup(42) # fast - [42] - """ - values = self.filter(lambda (k, v): k == key).values() - - if hasattr(self, "_partitionFunc"): - return self.ctx.runJob(self, lambda x: x, [self._partitionFunc(key)], False) - - return values.collect() - - def _is_pickled(self): - """ Return this RDD is serialized by Pickle or not. """ - der = self._jrdd_deserializer - if isinstance(der, PickleSerializer): - return True - if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer): - return True - return False - - def _to_jrdd(self): - """ Return an JavaRDD of Object by unpickling - - It will convert each Python object into Java object by Pyrolite, whenever the - RDD is serialized in batch or not. - """ - if not self._is_pickled(): - self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024)) - batched = isinstance(self._jrdd_deserializer, BatchedSerializer) - return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched) - def countApproxDistinct(self, relativeSD=0.05): """ :: Experimental :: @@ -2043,51 +1889,6 @@ def countApproxDistinct(self, relativeSD=0.05): # Return approximate number of distinct values for each key in this RDD. # """ - def countApprox(self, timeout, confidence=0.95): - """ - :: Experimental :: - Approximate version of count() that returns a potentially incomplete - result within a timeout, even if not all tasks have finished. - - >>> rdd = sc.parallelize(range(1000), 10) - >>> rdd.countApprox(1000, 1.0) - 1000 - """ - drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))]) - return int(drdd.sumApprox(timeout, confidence)) - - def sumApprox(self, timeout, confidence=0.95): - """ - :: Experimental :: - Approximate operation to return the sum within a timeout - or meet the confidence. - - >>> rdd = sc.parallelize(range(1000), 10) - >>> r = sum(xrange(1000)) - >>> (rdd.sumApprox(1000) - r) / r < 0.05 - True - """ - jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd() - jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) - r = jdrdd.sumApprox(timeout, confidence).getFinalValue() - return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) - - def meanApprox(self, timeout, confidence=0.95): - """ - :: Experimental :: - Approximate operation to return the mean within a timeout - or meet the confidence. - - >>> rdd = sc.parallelize(range(1000), 10) - >>> r = sum(xrange(1000)) / 1000.0 - >>> (rdd.meanApprox(1000) - r) / r < 0.05 - True - """ - jrdd = self.map(float)._to_jrdd() - jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) - r = jdrdd.meanApprox(timeout, confidence).getFinalValue() - return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) - # TODO # def countByKeyApprox(self, timeout, confidence=0.95): # """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index dfd1f787179dd..7d8ac72437183 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -332,98 +332,6 @@ def test_namedtuple_in_rdd(self): theDoes = self.sc.parallelize([jon, jane]) self.assertEquals([jon, jane], theDoes.collect()) - def test_histogram(self): - # empty - rdd = self.sc.parallelize([]) - self.assertEquals([0], rdd.histogram([0, 10])[1]) - self.assertEquals([0], rdd.histogram([0, 10], True)[1]) - - # out of range - rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0], rdd.histogram([0, 10])[1]) - self.assertEquals([0], rdd.histogram([0, 10], True)[1]) - - # in range with one bucket - rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals([4], rdd.histogram([0, 10])[1]) - self.assertEquals([4], rdd.histogram([0, 10], True)[1]) - - # in range with one bucket exact match - self.assertEquals([4], rdd.histogram([1, 4])[1]) - self.assertEquals([4], rdd.histogram([1, 4], True)[1]) - - # out of range with two buckets - rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1]) - self.assertEquals([0, 0], rdd.histogram([0, 5, 10], True)[1]) - - # out of range with two uneven buckets - rdd = self.sc.parallelize([10.01, -0.01]) - self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1]) - - # in range with two buckets - rdd = self.sc.parallelize([1, 2, 3, 5, 6]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1]) - - # in range with two bucket and None - rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 10], True)[1]) - - # in range with two uneven buckets - rdd = self.sc.parallelize([1, 2, 3, 5, 6]) - self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1]) - - # mixed range with two uneven buckets - rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01]) - self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1]) - - # mixed range with four uneven buckets - rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1]) - self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) - - # mixed range with uneven buckets and NaN - rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, - 199.0, 200.0, 200.1, None, float('nan')]) - self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1]) - - # out of range with infinite buckets - rdd = self.sc.parallelize([10.01, -0.01, float('nan')]) - self.assertEquals([1, 1], rdd.histogram([float('-inf'), 0, float('inf')])[1]) - - # invalid buckets - self.assertRaises(ValueError, lambda: rdd.histogram([])) - self.assertRaises(ValueError, lambda: rdd.histogram([1])) - - # without buckets - rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals(([1, 4], [4]), rdd.histogram(1)) - - # without buckets single element - rdd = self.sc.parallelize([1]) - self.assertEquals(([1, 1], [1]), rdd.histogram(1)) - - # without bucket no range - rdd = self.sc.parallelize([1] * 4) - self.assertEquals(([1, 1], [4]), rdd.histogram(1)) - - # without buckets basic two - rdd = self.sc.parallelize(range(1, 5)) - self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2)) - - # without buckets with more requested than elements - rdd = self.sc.parallelize([1, 2]) - buckets = [1 + 0.2 * i for i in range(6)] - hist = [1, 0, 0, 0, 1] - self.assertEquals((buckets, hist), rdd.histogram(5)) - - # invalid RDDs - rdd = self.sc.parallelize([1, float('inf')]) - self.assertRaises(ValueError, lambda: rdd.histogram(2)) - rdd = self.sc.parallelize([float('nan')]) - self.assertRaises(ValueError, lambda: rdd.histogram(2)) - def test_count_approx_distinct(self): rdd = self.sc.parallelize(range(1000)) self.assertTrue(950 < rdd.countApproxDistinct() < 1050) From 657a09bf14ae70639190f832d8b25dde6ee37c01 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 26 Aug 2014 12:05:41 -0700 Subject: [PATCH 19/19] remove countApproxDistinct() --- python/pyspark/rdd.py | 23 ----------------------- python/pyspark/tests.py | 7 ------- 2 files changed, 30 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7c2a9f89b36ed..c5854d9607c62 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1859,29 +1859,6 @@ def _defaultReducePartitions(self): else: return self.getNumPartitions() - def countApproxDistinct(self, relativeSD=0.05): - """ - :: Experimental :: - Return approximate number of distinct elements in the RDD. - - The algorithm used is based on streamlib's implementation of - "HyperLogLog in Practice: Algorithmic Engineering of a State - of The Art Cardinality Estimation Algorithm", available - here. - - This support all the types of objects, which is supported by - Pyrolite, nearly all builtin types. - - :param: relativeSD Relative accuracy. Smaller values create - counters that require more space. - It must be greater than 0.000017. - - >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() - >>> 950 < n < 1050 - True - """ - return self._to_jrdd().countApproxDistinct(relativeSD) - # TODO # def countApproxDistinctByKey(self, timeout, confidence=0.95): # """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 7d8ac72437183..51bfbb47e53c2 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -332,13 +332,6 @@ def test_namedtuple_in_rdd(self): theDoes = self.sc.parallelize([jon, jane]) self.assertEquals([jon, jane], theDoes.collect()) - def test_count_approx_distinct(self): - rdd = self.sc.parallelize(range(1000)) - self.assertTrue(950 < rdd.countApproxDistinct() < 1050) - self.assertTrue(950 < rdd.map(float).countApproxDistinct() < 1050) - self.assertTrue(950 < rdd.map(str).countApproxDistinct() < 1050) - self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 1050) - def test_large_broadcast(self): N = 100000 data = [[float(i) for i in range(300)] for i in range(N)]