From 491601600a09de1f7f3b46b7459e320d4933cb08 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Mon, 10 Mar 2014 23:58:32 -0700 Subject: [PATCH 01/12] added histogram method, added max and min to statscounter --- python/pyspark/rdd.py | 56 +++++++++++++++++++++++++++++++++++ python/pyspark/statcounter.py | 23 ++++++++++++-- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index be23f87f5ed2d..3fff57ea35257 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -24,6 +24,7 @@ import sys import shlex import traceback +from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread @@ -534,6 +535,7 @@ def func(iterator): return reduce(op, vals, zeroValue) # TODO: aggregate + def sum(self): """ @@ -610,6 +612,60 @@ def sampleVariance(self): """ return self.stats().sampleVariance() + def getBuckets(self, bucketCount): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 75, 50]).histogram() + {(0,49):3, (50, 100):3} + """ + + #use the statscounter as a quick way of getting max and min + mm_stats = self.stats() + min = mm_stats.min() + max = mm_stats.max() + + increment = (max-min)/bucketCount + buckets = range(min,min) + if increment != 0: + buckets = range(min,max, increment) + + return buckets + + def histogram(self, bucketCount, buckets=None): + evenBuckets = False + if not buckets: + buckets = self.getBuckets(bucketCount) + if len(buckets) < 2: + raise ValueError("requires more than 1 bucket") + if len(buckets) % 2 == 0: + evenBuckets = True + # histogram partition + def histogramPartition(iterator): + counters = defaultdict(int) + for obj in iterator: + k = bisect_right(buckets, obj) + if k < len(buckets) and k > 0: + key = (buckets[k-1], buckets[k]-1) + elif k == len(buckets): + key = (buckets[k-1], float("inf")) + elif k == 0: + key = (float("-inf"), buckets[k]-1) + counters[key] += 1 + yield counters + + # merge counters + def mergeCounters(d1, d2): + for k in d2.keys(): + if k in d1: + d1[k] += d2[k] + return d1 + + #map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters) + return self.mapPartitions(histogramPartition).reduce(mergeCounters) + + def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 8e1cbd4ad9856..ee75c414dcbf6 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -26,7 +26,9 @@ def __init__(self, values=[]): self.n = 0L # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) - + self.max_v = float("-inf") + self.min_v = float("inf") + for v in values: self.merge(v) @@ -36,6 +38,11 @@ def merge(self, value): self.n += 1 self.mu += delta / self.n self.m2 += delta * (value - self.mu) + if self.max_v < value: + self.max_v = value + if self.min_v > value: + self.min_v = value + return self # Merge another StatCounter into this one, adding up the internal statistics. @@ -49,7 +56,10 @@ def mergeStats(self, other): if self.n == 0: self.mu = other.mu self.m2 = other.m2 - self.n = other.n + self.n = other.n + self.max_v = other.max_v + self.min_v = other.min_v + elif other.n != 0: delta = other.mu - self.mu if other.n * 10 < self.n: @@ -58,6 +68,9 @@ def mergeStats(self, other): self.mu = other.mu - (delta * self.n) / (self.n + other.n) else: self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) + + self.max_v = max(self.max_v, other.max_v) + self.min_v = min(self.min_v, other.min_v) self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n @@ -76,6 +89,12 @@ def mean(self): def sum(self): return self.n * self.mu + def min(self): + return self.min_v + + def max(self): + return self.max_v + # Return the variance of the values. def variance(self): if self.n == 0: From eaf89d957e84d3b926f6c5f3f65acb8764c7ec2f Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 00:34:30 -0700 Subject: [PATCH 02/12] added correct doctest for histogram --- python/pyspark/rdd.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3fff57ea35257..bef32c1f37dfa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -613,14 +613,6 @@ def sampleVariance(self): return self.stats().sampleVariance() def getBuckets(self, bucketCount): - """ - Compute a histogram of the data using bucketCount number of buckets - evenly spaced between the min and max of the RDD. - - >>> sc.parallelize([1,49, 23, 100, 75, 50]).histogram() - {(0,49):3, (50, 100):3} - """ - #use the statscounter as a quick way of getting max and min mm_stats = self.stats() min = mm_stats.min() @@ -634,6 +626,14 @@ def getBuckets(self, bucketCount): return buckets def histogram(self, bucketCount, buckets=None): + """ + Compute a histogram of the data using bucketCount number of buckets + evenly spaced between the min and max of the RDD. + + >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) + defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) + """ + evenBuckets = False if not buckets: buckets = self.getBuckets(bucketCount) From 29981f20c28101a34bb15e939062dc24677c4773 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 00:57:33 -0700 Subject: [PATCH 03/12] fixed indentation on doctest comment --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bef32c1f37dfa..ef9d5638c7d60 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -626,7 +626,7 @@ def getBuckets(self, bucketCount): return buckets def histogram(self, bucketCount, buckets=None): - """ + """ Compute a histogram of the data using bucketCount number of buckets evenly spaced between the min and max of the RDD. From 37a7deabb2bfffd7ced1c15d6a079e942b73e7b6 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Tue, 11 Mar 2014 11:44:22 -0700 Subject: [PATCH 04/12] cleaned up boundaries for histogram -- uses real min/max when buckets are derived --- python/pyspark/rdd.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ef9d5638c7d60..9cfc2a8028ce8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -623,7 +623,7 @@ def getBuckets(self, bucketCount): if increment != 0: buckets = range(min,max, increment) - return buckets + return {"min":min, "max":max, "buckets":buckets} def histogram(self, bucketCount, buckets=None): """ @@ -633,10 +633,15 @@ def histogram(self, bucketCount, buckets=None): >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) """ - + min = float("-inf") + max = float("inf") evenBuckets = False if not buckets: - buckets = self.getBuckets(bucketCount) + b = self.getBuckets(bucketCount) + buckets = b["buckets"] + min = b["min"] + max = b["max"] + if len(buckets) < 2: raise ValueError("requires more than 1 bucket") if len(buckets) % 2 == 0: @@ -649,9 +654,9 @@ def histogramPartition(iterator): if k < len(buckets) and k > 0: key = (buckets[k-1], buckets[k]-1) elif k == len(buckets): - key = (buckets[k-1], float("inf")) + key = (buckets[k-1], max) elif k == 0: - key = (float("-inf"), buckets[k]-1) + key = (min, buckets[k]-1) counters[key] += 1 yield counters From 1e7056db8a9ce0364e46ad0e269bc1de9f9925f4 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Wed, 12 Mar 2014 10:42:09 -0700 Subject: [PATCH 05/12] added underscore to getBucket --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9cfc2a8028ce8..1db49cf10f38b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -612,7 +612,7 @@ def sampleVariance(self): """ return self.stats().sampleVariance() - def getBuckets(self, bucketCount): + def _getBuckets(self, bucketCount): #use the statscounter as a quick way of getting max and min mm_stats = self.stats() min = mm_stats.min() @@ -637,7 +637,7 @@ def histogram(self, bucketCount, buckets=None): max = float("inf") evenBuckets = False if not buckets: - b = self.getBuckets(bucketCount) + b = self._getBuckets(bucketCount) buckets = b["buckets"] min = b["min"] max = b["max"] From ed6713682ac5822b80f23fe1752c058e94033900 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 10:56:22 -0700 Subject: [PATCH 06/12] broke min/max out into separate transaction, added to rdd.py --- python/pyspark/rdd.py | 77 ++++++++++--------------------------------- 1 file changed, 18 insertions(+), 59 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1db49cf10f38b..04ee60a57e4cd 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -537,6 +537,24 @@ def func(iterator): # TODO: aggregate + def max(self): + """ + Find the maximum item in this RDD. + + >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() + 43.0 + """ + return self.stats().max() + + def min(self): + """ + Find the maximum item in this RDD. + + >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() + 1.0 + """ + return self.stats().min() + def sum(self): """ Add up the elements in this RDD. @@ -612,65 +630,6 @@ def sampleVariance(self): """ return self.stats().sampleVariance() - def _getBuckets(self, bucketCount): - #use the statscounter as a quick way of getting max and min - mm_stats = self.stats() - min = mm_stats.min() - max = mm_stats.max() - - increment = (max-min)/bucketCount - buckets = range(min,min) - if increment != 0: - buckets = range(min,max, increment) - - return {"min":min, "max":max, "buckets":buckets} - - def histogram(self, bucketCount, buckets=None): - """ - Compute a histogram of the data using bucketCount number of buckets - evenly spaced between the min and max of the RDD. - - >>> sc.parallelize([1,49, 23, 100, 12, 13, 20, 22, 75, 50]).histogram(3) - defaultdict(, {(67, inf): 2, (1, 33): 6, (34, 66): 2}) - """ - min = float("-inf") - max = float("inf") - evenBuckets = False - if not buckets: - b = self._getBuckets(bucketCount) - buckets = b["buckets"] - min = b["min"] - max = b["max"] - - if len(buckets) < 2: - raise ValueError("requires more than 1 bucket") - if len(buckets) % 2 == 0: - evenBuckets = True - # histogram partition - def histogramPartition(iterator): - counters = defaultdict(int) - for obj in iterator: - k = bisect_right(buckets, obj) - if k < len(buckets) and k > 0: - key = (buckets[k-1], buckets[k]-1) - elif k == len(buckets): - key = (buckets[k-1], max) - elif k == 0: - key = (min, buckets[k]-1) - counters[key] += 1 - yield counters - - # merge counters - def mergeCounters(d1, d2): - for k in d2.keys(): - if k in d1: - d1[k] += d2[k] - return d1 - - #map partitions(histogram_partition(bucketFunction)).reduce(mergeCounters) - return self.mapPartitions(histogramPartition).reduce(mergeCounters) - - def countByValue(self): """ Return the count of each unique value in this RDD as a dictionary of From a5c13b03c3e622315ee35b11b552366f7a48247a Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 13:30:12 -0700 Subject: [PATCH 07/12] Added min and max to Scala and Java RDD, added min and max to StatCounter --- .../org/apache/spark/api/java/JavaRDDLike.scala | 10 ++++++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++++ .../org/apache/spark/util/StatCounter.scala | 16 +++++++++++++++- .../org/apache/spark/PartitioningSuite.scala | 2 ++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 ++ python/pyspark/rdd.py | 1 - 6 files changed, 33 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..a5abc6f3186ff 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -477,6 +477,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + def max(comp: Comparator[T]): T = { + import scala.collection.JavaConversions._ + rdd.max()(Ordering.comparatorToOrdering(comp)) + } + + def min(comp: Comparator[T]): T = { + import scala.collection.JavaConversions._ + rdd.min()(Ordering.comparatorToOrdering(comp)) + } + /** * Returns the first K elements from this RDD using the * natural ordering for T while maintain the order. diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..bf2851839017c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,6 +958,10 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) + def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)} + + def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)} + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index f837dc7ccc860..ed9dd4334e6fe 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) + private var max_v: Double = 0 // Running max of our values + private var min_v: Double = 0 // Running min of our values merge(values) @@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { n += 1 mu += delta / n m2 += delta * (value - mu) + max_v = math.max(max_v, value) + min_v = math.min(min_v, value) this } @@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { if (n == 0) { mu = other.mu m2 = other.m2 - n = other.n + n = other.n + max_v = other.max_v + min_v = other.min_v } else if (other.n != 0) { val delta = other.mu - mu if (other.n * 10 < n) { @@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n) n += other.n + max_v = math.max(max_v, other.max_v) + min_v = math.min(min_v, other.min_v) } this } @@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { other.n = n other.mu = mu other.m2 = m2 + other.max_v = max_v + other.min_v = min_v other } @@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sum: Double = n * mu + def max: Double = max_v + + def min: Double = min_v + /** Return the variance of the values. */ def variance: Double = { if (n == 0) { diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 4305686d3a6d5..567d6be108749 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(abs(6.0/2 - rdd.mean) < 0.01) assert(abs(1.0 - rdd.variance) < 0.01) assert(abs(1.0 - rdd.stdev) < 0.01) + assert(abs(4.0 - stats.max) === 0) + assert(abs(-1.0 - stats.max) === 0) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 60bcada55245b..e9394bbe0541b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) + assert(nums.max() === 4) + assert(nums.min() === 1) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 04ee60a57e4cd..5f4b87abc6d25 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -24,7 +24,6 @@ import sys import shlex import traceback -from bisect import bisect_right from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread From 1a97558b7e8cdb6bb2dc06a815943192c3285acb Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 23:08:30 -0700 Subject: [PATCH 08/12] added max and min to StatCounter output, updated doc --- .../org/apache/spark/api/java/JavaRDDLike.scala | 12 ++++++++++++ core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++++-- .../scala/org/apache/spark/util/StatCounter.scala | 6 +++--- .../scala/org/apache/spark/PartitioningSuite.scala | 4 ++-- python/pyspark/rdd.py | 4 ++-- 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a5abc6f3186ff..44a797d16c996 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -477,11 +477,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Returns the maximum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the maximum of the RDD + * */ def max(comp: Comparator[T]): T = { import scala.collection.JavaConversions._ rdd.max()(Ordering.comparatorToOrdering(comp)) } + /** + * Returns the minimum element from this RDD as defined by the specified + * Comparator[T]. + * @params comp the comparator that defines ordering + * @return the minimum of the RDD + * */ def min(comp: Comparator[T]): T = { import scala.collection.JavaConversions._ rdd.min()(Ordering.comparatorToOrdering(comp)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf2851839017c..f7baab83cc170 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -958,9 +958,17 @@ abstract class RDD[T: ClassTag]( */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) - def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)} + /** + * Returns the max of this RDD as defined by the implicit Ordering[T]. + * @return the maximum element of the RDD + * */ + def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max) - def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)} + /** + * Returns the min of this RDD as defined by the implicit Ordering[T]. + * @return the minimum element of the RDD + * */ + def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min) /** * Save this RDD as a text file, using string representations of elements. diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index ed9dd4334e6fe..752e96c2434f7 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) - private var max_v: Double = 0 // Running max of our values - private var min_v: Double = 0 // Running min of our values + private var max_v: Double = Double(-Infinity) // Running max of our values + private var min_v: Double = Double(Infinity) // Running min of our values merge(values) @@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sampleStdev: Double = math.sqrt(sampleVariance) override def toString: String = { - "(count: %d, mean: %f, stdev: %f)".format(count, mean, stdev) + "(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min) } } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 567d6be108749..0ef07a180a468 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -171,8 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(abs(6.0/2 - rdd.mean) < 0.01) assert(abs(1.0 - rdd.variance) < 0.01) assert(abs(1.0 - rdd.stdev) < 0.01) - assert(abs(4.0 - stats.max) === 0) - assert(abs(-1.0 - stats.max) === 0) + assert(stats.max === 4.0) + assert(stats.min === -1.0) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5f4b87abc6d25..ec547c6d29c4c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -543,7 +543,7 @@ def max(self): >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 43.0 """ - return self.stats().max() + return self.reduce(max) def min(self): """ @@ -552,7 +552,7 @@ def min(self): >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 1.0 """ - return self.stats().min() + return self.reduce(min) def sum(self): """ From 21dd366744aa1a669db41dbdde56e5f347d9b6f6 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 23:15:45 -0700 Subject: [PATCH 09/12] added max and min to StatCounter output, updated doc --- core/src/main/scala/org/apache/spark/util/StatCounter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 752e96c2434f7..99828780398d7 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) - private var max_v: Double = Double(-Infinity) // Running max of our values - private var min_v: Double = Double(Infinity) // Running min of our values + private var max_v: Double = Double.PositiveInfinity // Running max of our values + private var min_v: Double = Double.NegativeInfinity // Running min of our values merge(values) From 5d96799ae2926254fbcf73e8041219f8adcb6502 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 23:17:37 -0700 Subject: [PATCH 10/12] added max and min to StatCounter repr for pyspark --- python/pyspark/statcounter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index ee75c414dcbf6..22848af0851cc 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -124,5 +124,5 @@ def sampleStdev(self): return math.sqrt(self.sampleVariance()) def __repr__(self): - return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev()) + return "(count: %s, mean: %s, stdev: %s, max: %s, min: %s)" % (self.count(), self.mean(), self.stdev(), self.max(), self.min()) From 82cde0eb8082513b9e09a35b56dafe9223322c73 Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Fri, 14 Mar 2014 23:28:52 -0700 Subject: [PATCH 11/12] flipped incorrectly assigned inf values in StatCounter --- core/src/main/scala/org/apache/spark/util/StatCounter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 99828780398d7..f3fc0ae415e00 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) - private var max_v: Double = Double.PositiveInfinity // Running max of our values - private var min_v: Double = Double.NegativeInfinity // Running min of our values + private var max_v: Double = Double.NegativeInfinity // Running max of our values + private var min_v: Double = Double.PositiveInfinity // Running min of our values merge(values) From fd3fd4b427dd374ffe8a33ad65a3db401540cc9c Mon Sep 17 00:00:00 2001 From: Dan McClary Date: Mon, 17 Mar 2014 02:26:29 -0700 Subject: [PATCH 12/12] fixed error, updated test --- .../apache/spark/api/java/JavaRDDLike.scala | 2 -- .../org/apache/spark/util/StatCounter.scala | 26 +++++++++---------- .../org/apache/spark/PartitioningSuite.scala | 2 +- python/pyspark/statcounter.py | 24 ++++++++--------- 4 files changed, 26 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 44a797d16c996..5680da15c2b27 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -484,7 +484,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * @return the maximum of the RDD * */ def max(comp: Comparator[T]): T = { - import scala.collection.JavaConversions._ rdd.max()(Ordering.comparatorToOrdering(comp)) } @@ -495,7 +494,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * @return the minimum of the RDD * */ def min(comp: Comparator[T]): T = { - import scala.collection.JavaConversions._ rdd.min()(Ordering.comparatorToOrdering(comp)) } diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index f3fc0ae415e00..732748a7ff82b 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -29,8 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2) - private var max_v: Double = Double.NegativeInfinity // Running max of our values - private var min_v: Double = Double.PositiveInfinity // Running min of our values + private var maxValue: Double = Double.NegativeInfinity // Running max of our values + private var minValue: Double = Double.PositiveInfinity // Running min of our values merge(values) @@ -43,8 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { n += 1 mu += delta / n m2 += delta * (value - mu) - max_v = math.max(max_v, value) - min_v = math.min(min_v, value) + maxValue = math.max(maxValue, value) + minValue = math.min(minValue, value) this } @@ -63,8 +63,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { mu = other.mu m2 = other.m2 n = other.n - max_v = other.max_v - min_v = other.min_v + maxValue = other.maxValue + minValue = other.minValue } else if (other.n != 0) { val delta = other.mu - mu if (other.n * 10 < n) { @@ -76,8 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n) n += other.n - max_v = math.max(max_v, other.max_v) - min_v = math.min(min_v, other.min_v) + maxValue = math.max(maxValue, other.maxValue) + minValue = math.min(minValue, other.minValue) } this } @@ -89,8 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { other.n = n other.mu = mu other.m2 = m2 - other.max_v = max_v - other.min_v = min_v + other.maxValue = maxValue + other.minValue = minValue other } @@ -100,9 +100,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sum: Double = n * mu - def max: Double = max_v + def max: Double = maxValue - def min: Double = min_v + def min: Double = minValue /** Return the variance of the values. */ def variance: Double = { @@ -135,7 +135,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sampleStdev: Double = math.sqrt(sampleVariance) override def toString: String = { - "(count: %d, mean: %f, stdev: %f, max: %f, min: $f)".format(count, mean, stdev, max, min) + "(count: %d, mean: %f, stdev: %f, max: %f, min: %f)".format(count, mean, stdev, max, min) } } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 0ef07a180a468..996db70809320 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -172,7 +172,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(abs(1.0 - rdd.variance) < 0.01) assert(abs(1.0 - rdd.stdev) < 0.01) assert(stats.max === 4.0) - assert(stats.min === -1.0) + assert(stats.min === 2.0) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 22848af0851cc..080325061a697 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -26,8 +26,8 @@ def __init__(self, values=[]): self.n = 0L # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) - self.max_v = float("-inf") - self.min_v = float("inf") + self.maxValue = float("-inf") + self.minValue = float("inf") for v in values: self.merge(v) @@ -38,10 +38,10 @@ def merge(self, value): self.n += 1 self.mu += delta / self.n self.m2 += delta * (value - self.mu) - if self.max_v < value: - self.max_v = value - if self.min_v > value: - self.min_v = value + if self.maxValue < value: + self.maxValue = value + if self.minValue > value: + self.minValue = value return self @@ -57,8 +57,8 @@ def mergeStats(self, other): self.mu = other.mu self.m2 = other.m2 self.n = other.n - self.max_v = other.max_v - self.min_v = other.min_v + self.maxValue = other.maxValue + self.minValue = other.minValue elif other.n != 0: delta = other.mu - self.mu @@ -69,8 +69,8 @@ def mergeStats(self, other): else: self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) - self.max_v = max(self.max_v, other.max_v) - self.min_v = min(self.min_v, other.min_v) + self.maxValue = max(self.maxValue, other.maxValue) + self.minValue = min(self.minValue, other.minValue) self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n @@ -90,10 +90,10 @@ def sum(self): return self.n * self.mu def min(self): - return self.min_v + return self.minValue def max(self): - return self.max_v + return self.maxValue # Return the variance of the values. def variance(self):