Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1441977
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
dorx May 29, 2014
ffea61a
SPARK-1939: Refactor takeSample method in RDD
dorx May 30, 2014
7cab53a
fixed import bug in rdd.py
dorx Jun 2, 2014
e3fd6a6
Merge branch 'master' into takeSample
dorx Jun 2, 2014
9ee94ee
[SPARK-2082] stratified sampling in PairRDDFunctions that guarantees …
dorx Jun 9, 2014
1d413ce
fixed checkstyle issues
dorx Jun 9, 2014
7e1a481
changed the permission on SamplingUtil
dorx Jun 9, 2014
46f6c8c
fixed the NPE caused by closures being cleaned before being passed in…
dorx Jun 10, 2014
50581fc
added a TODO for logging in python
dorx Jun 12, 2014
7327611
merge master
dorx Jun 13, 2014
9e74ab5
Separated out most of the logic in sampleByKey
dorx Jun 17, 2014
90d94c0
merge master
dorx Jun 17, 2014
0214a76
cleanUp
dorx Jun 18, 2014
944a10c
[SPARK-2145] Add lower bound on sampling rate
dorx Jun 19, 2014
1fe1cff
Changed fractionByKey to a map to enable arg check
dorx Jun 19, 2014
bd9dc6e
unit bug and style violation fixed
dorx Jun 19, 2014
4ad516b
remove unused imports from PairRDDFunctions
dorx Jun 20, 2014
254e03c
minor fixes and Java API.
dorx Jul 3, 2014
6b5b10b
Merge branch 'master' into stratified
dorx Jul 3, 2014
ee9d260
addressed reviewer comments
dorx Jul 6, 2014
bbfb8c9
Merge branch 'master' into stratified
dorx Jul 6, 2014
9884a9f
style fix
dorx Jul 8, 2014
680b677
use mapPartitionWithIndex instead
dorx Jul 8, 2014
a2bf756
Merge branch 'master' into stratified
dorx Jul 8, 2014
a10e68d
style fix
dorx Jul 9, 2014
f4c21f3
Reviewer comments
dorx Jul 15, 2014
eecee5f
Merge branch 'master' into stratified
dorx Jul 15, 2014
b3013a4
move math3 back to test scope
mengxr Jul 25, 2014
b223529
use approx bounds for poisson
mengxr Jul 25, 2014
ea7d27f
merge master
dorx Jul 28, 2014
17a381b
fixed a merge issue and a failed unit
dorx Jul 28, 2014
eaf5771
bug fixes.
dorx Jul 28, 2014
245439e
moved minSamplingRate to getUpperBound
dorx Jul 29, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -129,6 +129,73 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, seed)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
54 changes: 45 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package org.apache.spark.rdd

import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{HashMap => JHashMap}
import java.util.{Date, HashMap => JHashMap}

import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

Expand All @@ -34,19 +32,19 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -195,6 +193,41 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
foldByKey(zeroValue, defaultPartitioner(self))(func)
}

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use
* additional passes over the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
* without replacement, we need one additional pass over the RDD to guarantee sample size;
* when sampling with replacement, we need two additional passes.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
exact: Boolean = false,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {

require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, exact, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, exact, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
Expand Down Expand Up @@ -531,6 +564,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*/
def collectAsMap(): Map[K, V] = {
val data = self.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,83 @@ private[spark] object SamplingUtils {
* ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success
* rate, where success rate is defined the same as in sampling with replacement.
*
* The smallest sampling rate supported is 1e-10 (in order to avoid running into the limit of the
* RNG's resolution).
*
* @param sampleSizeLowerBound sample size
* @param total size of RDD
* @param withReplacement whether sampling with replacement
* @return a sampling rate that guarantees sufficient sample size with 99.99% success rate
*/
def computeFractionForSampleSize(sampleSizeLowerBound: Int, total: Long,
withReplacement: Boolean): Double = {
val fraction = sampleSizeLowerBound.toDouble / total
if (withReplacement) {
val numStDev = if (sampleSizeLowerBound < 12) 9 else 5
fraction + numStDev * math.sqrt(fraction / total)
PoissonBounds.getUpperBound(sampleSizeLowerBound) / total
} else {
val delta = 1e-4
val gamma = - math.log(delta) / total
math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction))
val fraction = sampleSizeLowerBound.toDouble / total
BinomialBounds.getUpperBound(1e-4, total, fraction)
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample sizes with high confidence when sampling with replacement.
*/
private[spark] object PoissonBounds {

/**
* Returns a lambda such that Pr[X > s] is very small, where X ~ Pois(lambda).
*/
def getLowerBound(s: Double): Double = {
math.max(s - numStd(s) * math.sqrt(s), 1e-15)
}

/**
* Returns a lambda such that Pr[X < s] is very small, where X ~ Pois(lambda).
*
* @param s sample size
*/
def getUpperBound(s: Double): Double = {
math.max(s + numStd(s) * math.sqrt(s), 1e-10)
}

private def numStd(s: Double): Double = {
// TODO: Make it tighter.
if (s < 6.0) {
12.0
} else if (s < 16.0) {
9.0
} else {
6.0
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample size with high confidence when sampling without replacement.
*/
private[spark] object BinomialBounds {

val minSamplingRate = 1e-10

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have more than `fraction * n` successes.
*/
def getLowerBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n * (2.0 / 3.0)
fraction + gamma - math.sqrt(gamma * gamma + 3 * gamma * fraction)
}

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have less than `fraction * n` successes.
*/
def getUpperBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n
math.min(1,
math.max(minSamplingRate, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)))
}
}
Loading