From 69400105159f64f0672da896c313e2a22525d219 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 17 Jul 2014 22:00:13 -0700 Subject: [PATCH 01/10] Reservoir sampling implementation. --- .../spark/util/random/SamplingUtils.scala | 40 +++++++++++++++++++ .../util/random/SamplingUtilsSuite.scala | 21 ++++++++++ 2 files changed, 61 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala index a79e3ee756fc..226cb11805db 100644 --- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala @@ -17,8 +17,48 @@ package org.apache.spark.util.random +import scala.reflect.ClassTag + private[spark] object SamplingUtils { + /** + * Reservoir Sampling implementation. + * + * @param input input size + * @param k reservoir size + * @return (samples, input size) + */ + def reservoirSample[T: ClassTag](input: Iterator[T], k: Int): (Array[T], Int) = { + val reservoir = new Array[T](k) + // Put the first k elements in the reservoir. + var i = 0 + while (i < k && input.hasNext) { + val item = input.next() + reservoir(i) = item + i += 1 + } + + // If we have consumed all the elements, return them. Otherwise do the replacement. + if (i < k) { + // If input size < k, trim the array to return only an array of input size. + val trimReservoir = new Array[T](i) + System.arraycopy(reservoir, 0, trimReservoir, 0, i) + (trimReservoir, i) + } else { + // If input size > k, continue the sampling process. + val rand = new XORShiftRandom + while (input.hasNext) { + val item = input.next() + val replacementIndex = rand.nextInt(i) + if (replacementIndex < k) { + reservoir(replacementIndex) = item + } + i += 1 + } + (reservoir, i) + } + } + /** * Returns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of * the time. diff --git a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala index accfe2e9b7f2..b1b1f1e28b45 100644 --- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala @@ -17,11 +17,32 @@ package org.apache.spark.util.random +import scala.util.Random + import org.apache.commons.math3.distribution.{BinomialDistribution, PoissonDistribution} import org.scalatest.FunSuite class SamplingUtilsSuite extends FunSuite { + test("reservoirSample") { + val input = Seq.fill(100)(Random.nextInt()) + + // input size < k + val (sample1, count1) = SamplingUtils.reservoirSample(input.iterator, 150) + assert(count1 === 100) + assert(input === sample1.toSeq) + + // input size == k + val (sample2, count2) = SamplingUtils.reservoirSample(input.iterator, 100) + assert(count2 === 100) + assert(input === sample2.toSeq) + + // input size > k + val (sample3, count3) = SamplingUtils.reservoirSample(input.iterator, 10) + assert(count3 === 100) + assert(sample3.length === 10) + } + test("computeFraction") { // test that the computed fraction guarantees enough data points // in the sample with a failure rate <= 0.0001 From badf20ded132d985f6d12000a876316af7287877 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 17 Jul 2014 22:29:53 -0700 Subject: [PATCH 02/10] Renamed the method. --- .../org/apache/spark/util/random/SamplingUtils.scala | 4 ++-- .../org/apache/spark/util/random/SamplingUtilsSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala index 226cb11805db..44dc097eb988 100644 --- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala @@ -22,13 +22,13 @@ import scala.reflect.ClassTag private[spark] object SamplingUtils { /** - * Reservoir Sampling implementation. + * Reservoir sampling implementation that also returns the input size. * * @param input input size * @param k reservoir size * @return (samples, input size) */ - def reservoirSample[T: ClassTag](input: Iterator[T], k: Int): (Array[T], Int) = { + def reservoirSampleAndCount[T: ClassTag](input: Iterator[T], k: Int): (Array[T], Int) = { val reservoir = new Array[T](k) // Put the first k elements in the reservoir. var i = 0 diff --git a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala index b1b1f1e28b45..73a9d029b024 100644 --- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala @@ -24,21 +24,21 @@ import org.scalatest.FunSuite class SamplingUtilsSuite extends FunSuite { - test("reservoirSample") { + test("reservoirSampleAndCount") { val input = Seq.fill(100)(Random.nextInt()) // input size < k - val (sample1, count1) = SamplingUtils.reservoirSample(input.iterator, 150) + val (sample1, count1) = SamplingUtils.reservoirSampleAndCount(input.iterator, 150) assert(count1 === 100) assert(input === sample1.toSeq) // input size == k - val (sample2, count2) = SamplingUtils.reservoirSample(input.iterator, 100) + val (sample2, count2) = SamplingUtils.reservoirSampleAndCount(input.iterator, 100) assert(count2 === 100) assert(input === sample2.toSeq) // input size > k - val (sample3, count3) = SamplingUtils.reservoirSample(input.iterator, 10) + val (sample3, count3) = SamplingUtils.reservoirSampleAndCount(input.iterator, 10) assert(count3 === 100) assert(sample3.length === 10) } From 17bcbf3982fabc027900c9ce791ae3233ba66700 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 18 Jul 2014 00:39:23 -0700 Subject: [PATCH 03/10] Added seed. --- .../org/apache/spark/util/random/SamplingUtils.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala index 44dc097eb988..d10141b90e62 100644 --- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.random import scala.reflect.ClassTag +import scala.util.Random private[spark] object SamplingUtils { @@ -26,9 +27,14 @@ private[spark] object SamplingUtils { * * @param input input size * @param k reservoir size + * @param seed random seed * @return (samples, input size) */ - def reservoirSampleAndCount[T: ClassTag](input: Iterator[T], k: Int): (Array[T], Int) = { + def reservoirSampleAndCount[T: ClassTag]( + input: Iterator[T], + k: Int, + seed: Long = Random.nextLong()) + : (Array[T], Int) = { val reservoir = new Array[T](k) // Put the first k elements in the reservoir. var i = 0 @@ -46,7 +52,7 @@ private[spark] object SamplingUtils { (trimReservoir, i) } else { // If input size > k, continue the sampling process. - val rand = new XORShiftRandom + val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() val replacementIndex = rand.nextInt(i) From 9ee9992f8581557ca410cf38a88557d3fd3fe21a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 23 Jul 2014 11:16:01 -0700 Subject: [PATCH 04/10] update range partitioner to run only one job on roughly balanced data --- .../scala/org/apache/spark/Partitioner.scala | 93 ++++++++++++++++--- .../spark/util/random/RandomSampler.scala | 25 +++++ .../org/apache/spark/PartitioningSuite.scala | 6 +- 3 files changed, 107 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 52c018baa5f7..f269dc9abcd5 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -19,7 +19,11 @@ package org.apache.spark import java.io.{IOException, ObjectInputStream, ObjectOutputStream} -import scala.reflect.ClassTag +import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} +import scala.util.hashing.byteswap32 import org.apache.spark.rdd.RDD import org.apache.spark.serializer.JavaSerializer @@ -108,21 +112,84 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions == 1) { - Array() + Array.empty } else { - val rddSize = rdd.count() - val maxSampleSize = partitions * 20.0 - val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) - val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted - if (rddSample.length == 0) { - Array() + // This is the sample size we need to have roughly balanced output partitions. + val sampleSize = 20.0 * partitions + // Assume the input partitions are roughly balanced and over-sample a little bit. + val sampleSizePerPartition = math.ceil(5.0 * sampleSize / rdd.partitions.size).toInt + val shift = rdd.id + val classTagK = classTag[K] + val sketch = rdd.mapPartitionsWithIndex { (idx, iter) => + val seed = byteswap32(idx + shift) + val (sample, n) = SamplingUtils.reservoirSampleAndCount( + iter.map(_._1), sampleSizePerPartition, seed)(classTagK) + Iterator((idx, n, sample)) + }.collect() + var numItems = 0L + sketch.foreach { case (_, n, _) => + numItems += n + } + if (numItems == 0L) { + Array.empty } else { - val bounds = new Array[K](partitions - 1) - for (i <- 0 until partitions - 1) { - val index = (rddSample.length - 1) * (i + 1) / partitions - bounds(i) = rddSample(index) + // If a partition contains much more than the average number of items, we re-sample from it + // to ensure that enough items are collected from that partition. + val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) + val candidates = ArrayBuffer.empty[(K, Float)] + val imbalancedPartitions = ArrayBuffer.empty[Int] + sketch.foreach { case (idx, n, sample) => + if (fraction * n > sampleSizePerPartition) { + imbalancedPartitions += idx + } else { + // The weight is 1 over the sampling probability. + val weight = (n.toDouble / sample.size).toFloat + sample.foreach { key => + candidates += ((key, weight)) + } + } + } + if (imbalancedPartitions.nonEmpty) { + val sampleFunc: (TaskContext, Iterator[Product2[K, V]]) => Array[K] = { (context, iter) => + val random = new XORShiftRandom(byteswap32(context.partitionId - shift)) + iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray + } + val weight = (1.0 / fraction).toFloat + val resultHandler: (Int, Array[K]) => Unit = { (index, sample) => + sample.foreach { key => + candidates += ((key, weight)) + } + } + rdd.context.runJob( + rdd, sampleFunc, imbalancedPartitions, allowLocal = false, resultHandler) + } + var sumWeights: Double = 0.0 + candidates.foreach { case (_, weight) => + sumWeights += weight + } + val step = sumWeights / partitions + var cumWeight = 0.0 + var target = step + val bounds = ArrayBuffer.empty[K] + val sorted = candidates.sortBy(_._1) + var i = 0 + var j = 0 + var previousBound = Option.empty[K] + while ((i < sorted.length) && (j < partitions - 1)) { + val (key, weight) = sorted(i) + cumWeight += weight + if (cumWeight > target) { + // Skip duplicate values. + if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { + bounds += key + target += step + j += 1 + previousBound = Some(key) + } + } + i += 1 } - bounds + bounds.toArray } } } diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 32c5fdad75e5..04c4d78b1de6 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -106,3 +106,28 @@ class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { override def clone = new PoissonSampler[T](mean) } + +/** + * :: DeveloperApi :: + * A sampler selects items based on their importance scores defined in the keys. + * + * The importance score should be within range `[0, 1]`. Items with scores less than or equal to 0 + * would never get selected, while items with scores greater than or equal to 1 would always get + * selected. + * + * @param ratio sampling probability + * @tparam T item type + */ +@DeveloperApi +class ImportanceSampler[T](ratio: Double) extends RandomSampler[(Double, T), (Double, T)] { + + private[random] var rng: Random = new XORShiftRandom + + override def setSeed(seed: Long) = rng.setSeed(seed) + + override def sample(items: Iterator[(Double, T)]): Iterator[(Double, T)] = { + items.filter(item => rng.nextDouble() < ratio) + } + + override def clone = new ImportanceSampler[T](ratio) +} diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 4658a0806428..2138704ff6fb 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -52,14 +52,12 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(p2 === p2) assert(p4 === p4) - assert(p2 != p4) - assert(p4 != p2) + assert(p2 === p4) assert(p4 === anotherP4) assert(anotherP4 === p4) assert(descendingP2 === descendingP2) assert(descendingP4 === descendingP4) - assert(descendingP2 != descendingP4) - assert(descendingP4 != descendingP2) + assert(descendingP2 === descendingP4) assert(p2 != descendingP2) assert(p4 != descendingP4) assert(descendingP2 != p2) From 60be09e9e1e8f9fa7ebb039fa11d925bbce48a08 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 23 Jul 2014 19:41:33 -0700 Subject: [PATCH 05/10] remove importance sampler --- .../scala/org/apache/spark/Partitioner.scala | 3 +-- .../spark/util/random/RandomSampler.scala | 25 ------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index f269dc9abcd5..4d68c8d085a9 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -19,8 +19,6 @@ package org.apache.spark import java.io.{IOException, ObjectInputStream, ObjectOutputStream} -import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} - import scala.collection.mutable.ArrayBuffer import scala.reflect.{ClassTag, classTag} import scala.util.hashing.byteswap32 @@ -28,6 +26,7 @@ import scala.util.hashing.byteswap32 import org.apache.spark.rdd.RDD import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{CollectionsUtils, Utils} +import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} /** * An object that defines how the elements in a key-value pair RDD are partitioned by key. diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 04c4d78b1de6..32c5fdad75e5 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -106,28 +106,3 @@ class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { override def clone = new PoissonSampler[T](mean) } - -/** - * :: DeveloperApi :: - * A sampler selects items based on their importance scores defined in the keys. - * - * The importance score should be within range `[0, 1]`. Items with scores less than or equal to 0 - * would never get selected, while items with scores greater than or equal to 1 would always get - * selected. - * - * @param ratio sampling probability - * @tparam T item type - */ -@DeveloperApi -class ImportanceSampler[T](ratio: Double) extends RandomSampler[(Double, T), (Double, T)] { - - private[random] var rng: Random = new XORShiftRandom - - override def setSeed(seed: Long) = rng.setSeed(seed) - - override def sample(items: Iterator[(Double, T)]): Iterator[(Double, T)] = { - items.filter(item => rng.nextDouble() < ratio) - } - - override def clone = new ImportanceSampler[T](ratio) -} From a6e35d62d09ac8b3c348eeb527893b3857b629e7 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 23 Jul 2014 19:55:36 -0700 Subject: [PATCH 06/10] minor update --- .../src/main/scala/org/apache/spark/Partitioner.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 4d68c8d085a9..e10ca425f535 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -154,7 +154,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray } val weight = (1.0 / fraction).toFloat - val resultHandler: (Int, Array[K]) => Unit = { (index, sample) => + val resultHandler: (Int, Array[K]) => Unit = { (_, sample) => sample.foreach { key => candidates += ((key, weight)) } @@ -162,7 +162,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( rdd.context.runJob( rdd, sampleFunc, imbalancedPartitions, allowLocal = false, resultHandler) } - var sumWeights: Double = 0.0 + val numCandidates = candidates.size + var sumWeights = 0.0 candidates.foreach { case (_, weight) => sumWeights += weight } @@ -170,12 +171,12 @@ class RangePartitioner[K : Ordering : ClassTag, V]( var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] - val sorted = candidates.sortBy(_._1) + val orderedCandidates = candidates.sortBy(_._1) var i = 0 var j = 0 var previousBound = Option.empty[K] - while ((i < sorted.length) && (j < partitions - 1)) { - val (key, weight) = sorted(i) + while ((i < numCandidates) && (j < partitions - 1)) { + val (key, weight) = orderedCandidates(i) cumWeight += weight if (cumWeight > target) { // Skip duplicate values. From db58a55395a598c528870f7692b2f5d24eedd9d4 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 23 Jul 2014 20:47:08 -0700 Subject: [PATCH 07/10] add unit tests --- .../scala/org/apache/spark/Partitioner.scala | 7 +++-- .../org/apache/spark/PartitioningSuite.scala | 28 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index e10ca425f535..d99067fb5fef 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -108,6 +108,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] + @transient private[spark] var singlePass = true // for unit tests + // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions == 1) { @@ -116,7 +118,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // This is the sample size we need to have roughly balanced output partitions. val sampleSize = 20.0 * partitions // Assume the input partitions are roughly balanced and over-sample a little bit. - val sampleSizePerPartition = math.ceil(5.0 * sampleSize / rdd.partitions.size).toInt + val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt val shift = rdd.id val classTagK = classTag[K] val sketch = rdd.mapPartitionsWithIndex { (idx, iter) => @@ -149,9 +151,10 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } if (imbalancedPartitions.nonEmpty) { + singlePass = false val sampleFunc: (TaskContext, Iterator[Product2[K, V]]) => Array[K] = { (context, iter) => val random = new XORShiftRandom(byteswap32(context.partitionId - shift)) - iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray + iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray(classTagK) } val weight = (1.0 / fraction).toFloat val resultHandler: (Int, Array[K]) => Unit = { (_, sample) => diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 2138704ff6fb..44f800bd61db 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -100,6 +100,34 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet partitioner.getPartition(Row(100)) } + test("RangePartitioner should run only one job if data is roughly balanced") { + val rdd = sc.makeRDD(0 until 20, 20).flatMap { i => + val random = new java.util.Random(i) + Iterator.fill(5000 * i)((random.nextDouble() + i, i)) + }.cache() + for (numPartitions <- Seq(10, 20, 40)) { + val partitioner = new RangePartitioner(numPartitions, rdd) + assert(partitioner.numPartitions === numPartitions) + assert(partitioner.singlePass === true) + val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values + assert(counts.max < 2.0 * counts.min) + } + } + + test("RangePartitioner should work well on unbalanced data") { + val rdd = sc.makeRDD(0 until 20, 20).flatMap { i => + val random = new java.util.Random(i) + Iterator.fill(20 * i * i * i)((random.nextDouble() + i, i)) + }.cache() + for (numPartitions <- Seq(2, 4, 8)) { + val partitioner = new RangePartitioner(numPartitions, rdd) + assert(partitioner.numPartitions === numPartitions) + assert(partitioner.singlePass === false) + val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values + assert(counts.max < 2.0 * counts.min) + } + } + test("HashPartitioner not equal to RangePartitioner") { val rdd = sc.parallelize(1 to 10).map(x => (x, x)) val rangeP2 = new RangePartitioner(2, rdd) From c436d30143bc1efe2637edf57b785220ec3795e3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 23 Jul 2014 21:00:44 -0700 Subject: [PATCH 08/10] fix binary metrics unit tests --- .../BinaryClassificationMetricsSuite.scala | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala index 9d16182f9d8c..94db1dc18323 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -20,8 +20,26 @@ package org.apache.spark.mllib.evaluation import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.TestingUtils.DoubleWithAlmostEquals class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { + + // TODO: move utility functions to TestingUtils. + + def elementsAlmostEqual(actual: Seq[Double], expected: Seq[Double]): Boolean = { + actual.zip(expected).forall { case (x1, x2) => + x1.almostEquals(x2) + } + } + + def elementsAlmostEqual( + actual: Seq[(Double, Double)], + expected: Seq[(Double, Double)])(implicit dummy: DummyImplicit): Boolean = { + actual.zip(expected).forall { case ((x1, y1), (x2, y2)) => + x1.almostEquals(x2) && y1.almostEquals(y2) + } + } + test("binary evaluation metrics") { val scoreAndLabels = sc.parallelize( Seq((0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)), 2) @@ -41,14 +59,14 @@ class BinaryClassificationMetricsSuite extends FunSuite with LocalSparkContext { val prCurve = Seq((0.0, 1.0)) ++ pr val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r) } val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)} - assert(metrics.thresholds().collect().toSeq === threshold) - assert(metrics.roc().collect().toSeq === rocCurve) - assert(metrics.areaUnderROC() === AreaUnderCurve.of(rocCurve)) - assert(metrics.pr().collect().toSeq === prCurve) - assert(metrics.areaUnderPR() === AreaUnderCurve.of(prCurve)) - assert(metrics.fMeasureByThreshold().collect().toSeq === threshold.zip(f1)) - assert(metrics.fMeasureByThreshold(2.0).collect().toSeq === threshold.zip(f2)) - assert(metrics.precisionByThreshold().collect().toSeq === threshold.zip(precision)) - assert(metrics.recallByThreshold().collect().toSeq === threshold.zip(recall)) + assert(elementsAlmostEqual(metrics.thresholds().collect(), threshold)) + assert(elementsAlmostEqual(metrics.roc().collect(), rocCurve)) + assert(metrics.areaUnderROC().almostEquals(AreaUnderCurve.of(rocCurve))) + assert(elementsAlmostEqual(metrics.pr().collect(), prCurve)) + assert(metrics.areaUnderPR().almostEquals(AreaUnderCurve.of(prCurve))) + assert(elementsAlmostEqual(metrics.fMeasureByThreshold().collect(), threshold.zip(f1))) + assert(elementsAlmostEqual(metrics.fMeasureByThreshold(2.0).collect(), threshold.zip(f2))) + assert(elementsAlmostEqual(metrics.precisionByThreshold().collect(), threshold.zip(precision))) + assert(elementsAlmostEqual(metrics.recallByThreshold().collect(), threshold.zip(recall))) } } From eb95dd8ff267f07226cd5419355c8e0e6ef2c7ea Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 27 Jul 2014 11:57:43 -0700 Subject: [PATCH 09/10] separate sketching and determining bounds impl --- .../scala/org/apache/spark/Partitioner.scala | 141 ++++++++++-------- .../org/apache/spark/PartitioningSuite.scala | 38 ++++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 5 + 3 files changed, 120 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index d99067fb5fef..d5ab7931433b 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -19,11 +19,12 @@ package org.apache.spark import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{ClassTag, classTag} import scala.util.hashing.byteswap32 -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{CollectionsUtils, Utils} import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils} @@ -106,31 +107,21 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ascending: Boolean = true) extends Partitioner { - private var ordering = implicitly[Ordering[K]] + // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. + require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") - @transient private[spark] var singlePass = true // for unit tests + private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { - if (partitions == 1) { + if (partitions <= 1) { Array.empty } else { - // This is the sample size we need to have roughly balanced output partitions. - val sampleSize = 20.0 * partitions + // This is the sample size we need to have roughly balanced output partitions, capped at 1M. + val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt - val shift = rdd.id - val classTagK = classTag[K] - val sketch = rdd.mapPartitionsWithIndex { (idx, iter) => - val seed = byteswap32(idx + shift) - val (sample, n) = SamplingUtils.reservoirSampleAndCount( - iter.map(_._1), sampleSizePerPartition, seed)(classTagK) - Iterator((idx, n, sample)) - }.collect() - var numItems = 0L - sketch.foreach { case (_, n, _) => - numItems += n - } + val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { @@ -138,8 +129,8 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] - val imbalancedPartitions = ArrayBuffer.empty[Int] - sketch.foreach { case (idx, n, sample) => + val imbalancedPartitions = mutable.Set.empty[Int] + sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { @@ -151,48 +142,14 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } if (imbalancedPartitions.nonEmpty) { - singlePass = false - val sampleFunc: (TaskContext, Iterator[Product2[K, V]]) => Array[K] = { (context, iter) => - val random = new XORShiftRandom(byteswap32(context.partitionId - shift)) - iter.map(_._1).filter(t => random.nextDouble() < fraction).toArray(classTagK) - } + // Re-sample imbalanced partitions with the desired sampling probability. + val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) + val seed = byteswap32(-rdd.id - 1) + val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat - val resultHandler: (Int, Array[K]) => Unit = { (_, sample) => - sample.foreach { key => - candidates += ((key, weight)) - } - } - rdd.context.runJob( - rdd, sampleFunc, imbalancedPartitions, allowLocal = false, resultHandler) + candidates ++= reSampled.map(x => (x, weight)) } - val numCandidates = candidates.size - var sumWeights = 0.0 - candidates.foreach { case (_, weight) => - sumWeights += weight - } - val step = sumWeights / partitions - var cumWeight = 0.0 - var target = step - val bounds = ArrayBuffer.empty[K] - val orderedCandidates = candidates.sortBy(_._1) - var i = 0 - var j = 0 - var previousBound = Option.empty[K] - while ((i < numCandidates) && (j < partitions - 1)) { - val (key, weight) = orderedCandidates(i) - cumWeight += weight - if (cumWeight > target) { - // Skip duplicate values. - if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { - bounds += key - target += step - j += 1 - previousBound = Some(key) - } - } - i += 1 - } - bounds.toArray + RangePartitioner.determineBounds(candidates, partitions) } } } @@ -282,3 +239,67 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } } } + +private[spark] object RangePartitioner { + + /** + * Sketches the input RDD via reservoir sampling on each partition. + * + * @param rdd the input RDD to sketch + * @param sampleSizePerPartition max sample size per partition + * @return (total number of items, an array of (partitionId, number of items, sample)) + */ + def sketch[K:ClassTag]( + rdd: RDD[K], + sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { + val shift = rdd.id + // val classTagK = classTag[K] // to avoid serializing the entire partitioner object + val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => + val seed = byteswap32(idx ^ (shift << 16)) + val (sample, n) = SamplingUtils.reservoirSampleAndCount( + iter, sampleSizePerPartition, seed) + Iterator((idx, n, sample)) + }.collect() + val numItems = sketched.map(_._2.toLong).sum + (numItems, sketched) + } + + /** + * Determines the bounds for range partitioning from candidates with weights indicating how many + * items each represents. Usually this is 1 over the probability used to sample this candidate. + * + * @param candidates unordered candidates with weights + * @param partitions number of partitions + * @return selected bounds + */ + def determineBounds[K:Ordering:ClassTag]( + candidates: ArrayBuffer[(K, Float)], + partitions: Int): Array[K] = { + val ordering = implicitly[Ordering[K]] + val ordered = candidates.sortBy(_._1) + val numCandidates = ordered.size + val sumWeights = ordered.map(_._2.toDouble).sum + val step = sumWeights / partitions + var cumWeight = 0.0 + var target = step + val bounds = ArrayBuffer.empty[K] + var i = 0 + var j = 0 + var previousBound = Option.empty[K] + while ((i < numCandidates) && (j < partitions - 1)) { + val (key, weight) = ordered(i) + cumWeight += weight + if (cumWeight > target) { + // Skip duplicate values. + if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { + bounds += key + target += step + j += 1 + previousBound = Some(key) + } + } + i += 1 + } + bounds.toArray + } +} diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 44f800bd61db..fc0cee3e8749 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.collection.mutable.ArrayBuffer import scala.math.abs import org.scalatest.{FunSuite, PrivateMethodTester} @@ -100,6 +101,28 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet partitioner.getPartition(Row(100)) } + test("RangPartitioner.sketch") { + val rdd = sc.makeRDD(0 until 20, 20).flatMap { i => + val random = new java.util.Random(i) + Iterator.fill(i)(random.nextDouble()) + }.cache() + val sampleSizePerPartition = 10 + val (count, sketched) = RangePartitioner.sketch(rdd, sampleSizePerPartition) + assert(count === rdd.count()) + sketched.foreach { case (idx, n, sample) => + assert(n === idx) + assert(sample.size === math.min(n, sampleSizePerPartition)) + } + } + + test("RangePartitioner.determineBounds") { + assert(RangePartitioner.determineBounds(ArrayBuffer.empty[(Int, Float)], 10).isEmpty, + "Bounds on an empty candidates set should be empty.") + val candidates = ArrayBuffer( + (0.7, 2.0f), (0.1, 1.0f), (0.4, 1.0f), (0.3, 1.0f), (0.2, 1.0f), (0.5, 1.0f), (1.0, 3.0f)) + assert(RangePartitioner.determineBounds(candidates, 3) === Array(0.4, 0.7)) + } + test("RangePartitioner should run only one job if data is roughly balanced") { val rdd = sc.makeRDD(0 until 20, 20).flatMap { i => val random = new java.util.Random(i) @@ -108,9 +131,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet for (numPartitions <- Seq(10, 20, 40)) { val partitioner = new RangePartitioner(numPartitions, rdd) assert(partitioner.numPartitions === numPartitions) - assert(partitioner.singlePass === true) val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values - assert(counts.max < 2.0 * counts.min) + assert(counts.max < 3.0 * counts.min) } } @@ -122,12 +144,20 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet for (numPartitions <- Seq(2, 4, 8)) { val partitioner = new RangePartitioner(numPartitions, rdd) assert(partitioner.numPartitions === numPartitions) - assert(partitioner.singlePass === false) val counts = rdd.keys.map(key => partitioner.getPartition(key)).countByValue().values - assert(counts.max < 2.0 * counts.min) + assert(counts.max < 3.0 * counts.min) } } + test("RangePartitioner should return a single partition for empty RDDs") { + val empty1 = sc.emptyRDD[(Int, Double)] + val partitioner1 = new RangePartitioner(0, empty1) + assert(partitioner1.numPartitions === 1) + val empty2 = sc.makeRDD(0 until 2, 2).flatMap(i => Seq.empty[(Int, Double)]) + val partitioner2 = new RangePartitioner(2, empty2) + assert(partitioner2.numPartitions === 1) + } + test("HashPartitioner not equal to RangePartitioner") { val rdd = sc.parallelize(1 to 10).map(x => (x, x)) val rangeP2 = new RangePartitioner(2, rdd) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 2924de112934..e9dcb7fe1b95 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -604,6 +604,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("sort an empty RDD") { + val data = sc.emptyRDD[Int] + assert(data.sortBy(x => x).collect() === Array.empty) + } + test("sortByKey") { val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) From 6cc25513d0c89ec01b7a49db2b2358ed6a15afa3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 27 Jul 2014 12:50:50 -0700 Subject: [PATCH 10/10] change foreach to for --- core/src/main/scala/org/apache/spark/Partitioner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index d5ab7931433b..37053bb6f37a 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -136,7 +136,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat - sample.foreach { key => + for (key <- sample) { candidates += ((key, weight)) } }