Skip to content

Commit ee9d260

Browse files
committed
addressed reviewer comments
1 parent 6b5b10b commit ee9d260

File tree

4 files changed

+122
-86
lines changed

4 files changed

+122
-86
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,18 +212,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
212212
exact: Boolean = true,
213213
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {
214214

215-
require(fractions.forall({case(k, v) => v >= 0.0}), "Invalid sampling rates.")
215+
require(fractions.forall {case(k, v) => v >= 0.0}, "Invalid sampling rates.")
216216

217-
if (withReplacement) {
217+
val samplingFunc = if (withReplacement) {
218218
val counts = if (exact) Some(this.countByKey()) else None
219-
val samplingFunc =
220219
StratifiedSampler.getPoissonSamplingFunction(self, fractions, exact, counts, seed)
221-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
222220
} else {
223-
val samplingFunc =
224221
StratifiedSampler.getBernoulliSamplingFunction(self, fractions, exact, seed)
225-
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
226222
}
223+
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning=true)
227224
}
228225

229226
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,6 @@ abstract class RDD[T: ClassTag](
350350

351351
/**
352352
* Return a sampled subset of this RDD.
353-
*
354353
*/
355354
def sample(withReplacement: Boolean,
356355
fraction: Double,

core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private[spark] object SamplingUtils {
7272
private[spark] object PoissonBounds {
7373

7474
val delta = 1e-4 / 3.0
75+
val epsilon = 1e-15
7576

7677
/**
7778
* Compute the threshold for accepting items on the fly. The threshold value is a fairly small
@@ -87,7 +88,7 @@ private[spark] object PoissonBounds {
8788
var ub = s
8889
while (lb < ub - 1.0) {
8990
val m = (lb + ub) / 2.0
90-
val poisson = new PoissonDistribution(m, 1e-15)
91+
val poisson = new PoissonDistribution(m, epsilon)
9192
val y = poisson.inverseCumulativeProbability(1 - delta)
9293
if (y > s) ub = m else lb = m
9394
}
@@ -96,7 +97,7 @@ private[spark] object PoissonBounds {
9697

9798
def getMinCount(lmbd: Double): Double = {
9899
if (lmbd == 0) return 0
99-
val poisson = new PoissonDistribution(lmbd, 1e-15)
100+
val poisson = new PoissonDistribution(lmbd, epsilon)
100101
poisson.inverseCumulativeProbability(delta)
101102
}
102103

@@ -114,7 +115,7 @@ private[spark] object PoissonBounds {
114115
var ub = s + math.sqrt(s / delta) // Chebyshev's inequality
115116
while (lb < ub - 1.0) {
116117
val m = (lb + ub) / 2.0
117-
val poisson = new PoissonDistribution(m, 1e-15)
118+
val poisson = new PoissonDistribution(m, epsilon)
118119
val y = poisson.inverseCumulativeProbability(delta)
119120
if (y >= s) ub = m else lb = m
120121
}

0 commit comments

Comments
 (0)