Skip to content

Commit 4201ddc

Browse files
committed
[SPARK-17768][CORE] Small (Sum,Count,Mean)Evaluator problems and suboptimalities
## What changes were proposed in this pull request? Fix: - GroupedMeanEvaluator and GroupedSumEvaluator are unused, as is the StudentTCacher support class - CountEvaluator can return a lower bound < 0, when counts can't be negative - MeanEvaluator will actually fail on exactly 1 datum (yields t-test with 0 DOF) - CountEvaluator uses a normal distribution, which may be an inappropriate approximation (leading to above) - Test for SumEvaluator asserts incorrect expected sums – e.g. after observing 10% of data has sum of 2, expectation should be 20, not 38 - CountEvaluator, MeanEvaluator have no unit tests to catch these - Duplication of distribution code across CountEvaluator, GroupedCountEvaluator - The stats in each could use a bit of documentation as I had to guess at them - (Code could use a few cleanups and optimizations too) ## How was this patch tested? Existing and new tests Author: Sean Owen <[email protected]> Closes #15341 from srowen/SPARK-17768.
1 parent 362ba4b commit 4201ddc

File tree

10 files changed

+203
-332
lines changed

10 files changed

+203
-332
lines changed

core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,59 @@
1717

1818
package org.apache.spark.partial
1919

20-
import org.apache.commons.math3.distribution.NormalDistribution
20+
import org.apache.commons.math3.distribution.{PascalDistribution, PoissonDistribution}
2121

2222
/**
2323
* An ApproximateEvaluator for counts.
24-
*
25-
* TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
26-
* be best to make this a special case of GroupedCountEvaluator with one group.
2724
*/
2825
private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
2926
extends ApproximateEvaluator[Long, BoundedDouble] {
3027

31-
var outputsMerged = 0
32-
var sum: Long = 0
28+
private var outputsMerged = 0
29+
private var sum: Long = 0
3330

34-
override def merge(outputId: Int, taskResult: Long) {
31+
override def merge(outputId: Int, taskResult: Long): Unit = {
3532
outputsMerged += 1
3633
sum += taskResult
3734
}
3835

3936
override def currentResult(): BoundedDouble = {
4037
if (outputsMerged == totalOutputs) {
4138
new BoundedDouble(sum, 1.0, sum, sum)
42-
} else if (outputsMerged == 0) {
43-
new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
39+
} else if (outputsMerged == 0 || sum == 0) {
40+
new BoundedDouble(0, 0.0, 0.0, Double.PositiveInfinity)
4441
} else {
4542
val p = outputsMerged.toDouble / totalOutputs
46-
val mean = (sum + 1 - p) / p
47-
val variance = (sum + 1) * (1 - p) / (p * p)
48-
val stdev = math.sqrt(variance)
49-
val confFactor = new NormalDistribution().
50-
inverseCumulativeProbability(1 - (1 - confidence) / 2)
51-
val low = mean - confFactor * stdev
52-
val high = mean + confFactor * stdev
53-
new BoundedDouble(mean, confidence, low, high)
43+
CountEvaluator.bound(confidence, sum, p)
5444
}
5545
}
5646
}
47+
48+
private[partial] object CountEvaluator {
49+
50+
def bound(confidence: Double, sum: Long, p: Double): BoundedDouble = {
51+
// Let the total count be N. A fraction p has been counted already, with sum 'sum',
52+
// as if each element from the total data set had been seen with probability p.
53+
val dist =
54+
if (sum <= 10000) {
55+
// The remaining count, k=N-sum, may be modeled as negative binomial (aka Pascal),
56+
// where there have been 'sum' successes of probability p already. (There are several
57+
// conventions, but this is the one followed by Commons Math3.)
58+
new PascalDistribution(sum.toInt, p)
59+
} else {
60+
// For large 'sum' (certainly, > Int.MaxValue!), use a Poisson approximation, which has
61+
// a different interpretation. "sum" elements have been observed having scanned a fraction
62+
// p of the data. This suggests data is counted at a rate of sum / p across the whole data
63+
// set. The total expected count from the rest is distributed as
64+
// (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p)
65+
new PoissonDistribution(sum * (1 - p) / p)
66+
}
67+
// Not quite symmetric; calculate interval straight from discrete distribution
68+
val low = dist.inverseCumulativeProbability((1 - confidence) / 2)
69+
val high = dist.inverseCumulativeProbability((1 + confidence) / 2)
70+
// Add 'sum' to each because distribution is just of remaining count, not observed
71+
new BoundedDouble(sum + dist.getNumericalMean, confidence, sum + low, sum + high)
72+
}
73+
74+
75+
}

core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,10 @@
1717

1818
package org.apache.spark.partial
1919

20-
import java.util.{HashMap => JHashMap}
21-
22-
import scala.collection.JavaConverters._
2320
import scala.collection.Map
2421
import scala.collection.mutable.HashMap
2522
import scala.reflect.ClassTag
2623

27-
import org.apache.commons.math3.distribution.NormalDistribution
28-
2924
import org.apache.spark.util.collection.OpenHashMap
3025

3126
/**
@@ -34,10 +29,10 @@ import org.apache.spark.util.collection.OpenHashMap
3429
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
3530
extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] {
3631

37-
var outputsMerged = 0
38-
var sums = new OpenHashMap[T, Long]() // Sum of counts for each key
32+
private var outputsMerged = 0
33+
private val sums = new OpenHashMap[T, Long]() // Sum of counts for each key
3934

40-
override def merge(outputId: Int, taskResult: OpenHashMap[T, Long]) {
35+
override def merge(outputId: Int, taskResult: OpenHashMap[T, Long]): Unit = {
4136
outputsMerged += 1
4237
taskResult.foreach { case (key, value) =>
4338
sums.changeValue(key, value, _ + value)
@@ -46,27 +41,12 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf
4641

4742
override def currentResult(): Map[T, BoundedDouble] = {
4843
if (outputsMerged == totalOutputs) {
49-
val result = new JHashMap[T, BoundedDouble](sums.size)
50-
sums.foreach { case (key, sum) =>
51-
result.put(key, new BoundedDouble(sum, 1.0, sum, sum))
52-
}
53-
result.asScala
44+
sums.map { case (key, sum) => (key, new BoundedDouble(sum, 1.0, sum, sum)) }.toMap
5445
} else if (outputsMerged == 0) {
5546
new HashMap[T, BoundedDouble]
5647
} else {
5748
val p = outputsMerged.toDouble / totalOutputs
58-
val confFactor = new NormalDistribution().
59-
inverseCumulativeProbability(1 - (1 - confidence) / 2)
60-
val result = new JHashMap[T, BoundedDouble](sums.size)
61-
sums.foreach { case (key, sum) =>
62-
val mean = (sum + 1 - p) / p
63-
val variance = (sum + 1) * (1 - p) / (p * p)
64-
val stdev = math.sqrt(variance)
65-
val low = mean - confFactor * stdev
66-
val high = mean + confFactor * stdev
67-
result.put(key, new BoundedDouble(mean, confidence, low, high))
68-
}
69-
result.asScala
49+
sums.map { case (key, sum) => (key, CountEvaluator.bound(confidence, sum, p)) }.toMap
7050
}
7151
}
7252
}

core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala

Lines changed: 0 additions & 80 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala

Lines changed: 0 additions & 88 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,35 @@ import org.apache.spark.util.StatCounter
2727
private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
2828
extends ApproximateEvaluator[StatCounter, BoundedDouble] {
2929

30-
var outputsMerged = 0
31-
var counter = new StatCounter
30+
private var outputsMerged = 0
31+
private val counter = new StatCounter()
3232

33-
override def merge(outputId: Int, taskResult: StatCounter) {
33+
override def merge(outputId: Int, taskResult: StatCounter): Unit = {
3434
outputsMerged += 1
3535
counter.merge(taskResult)
3636
}
3737

3838
override def currentResult(): BoundedDouble = {
3939
if (outputsMerged == totalOutputs) {
4040
new BoundedDouble(counter.mean, 1.0, counter.mean, counter.mean)
41-
} else if (outputsMerged == 0) {
41+
} else if (outputsMerged == 0 || counter.count == 0) {
4242
new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
43+
} else if (counter.count == 1) {
44+
new BoundedDouble(counter.mean, confidence, Double.NegativeInfinity, Double.PositiveInfinity)
4345
} else {
4446
val mean = counter.mean
4547
val stdev = math.sqrt(counter.sampleVariance / counter.count)
46-
val confFactor = {
47-
if (counter.count > 100) {
48-
new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2)
48+
val confFactor = if (counter.count > 100) {
49+
// For large n, the normal distribution is a good approximation to t-distribution
50+
new NormalDistribution().inverseCumulativeProbability((1 + confidence) / 2)
4951
} else {
52+
// t-distribution describes distribution of actual population mean
53+
// note that if this goes to 0, TDistribution will throw an exception.
54+
// Hence special casing 1 above.
5055
val degreesOfFreedom = (counter.count - 1).toInt
51-
new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2)
56+
new TDistribution(degreesOfFreedom).inverseCumulativeProbability((1 + confidence) / 2)
5257
}
53-
}
58+
// Symmetric, so confidence interval is symmetric about mean of distribution
5459
val low = mean - confFactor * stdev
5560
val high = mean + confFactor * stdev
5661
new BoundedDouble(mean, confidence, low, high)

core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)