From 383956abadb7f34f131d98109cdb09122f4e30a4 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 13 Apr 2016 11:26:58 -0700 Subject: [PATCH 1/3] bagged point supports sample weight --- .../spark/ml/tree/impl/BaggedPoint.scala | 44 +++++++++++-------- .../spark/ml/tree/impl/RandomForest.scala | 4 +- .../spark/ml/tree/impl/BaggedPointSuite.scala | 26 ++++++----- 3 files changed, 43 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala index 4e372702f0c65..c6f6106bd4d42 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala @@ -33,13 +33,13 @@ import org.apache.spark.util.random.XORShiftRandom * this datum has 1 copy, 0 copies, and 4 copies in the 3 subsamples, respectively. * * @param datum Data instance - * @param subsampleWeights Weight of this instance in each subsampled dataset. - * - * TODO: This does not currently support (Double) weighted instances. Once MLlib has weighted - * dataset support, update. (We store subsampleWeights as Double for this future extension.) + * @param subsampleCounts Number of samples of this instance in each subsampled dataset. + * @param sampleWeight The weight of this instance. */ -private[spark] class BaggedPoint[Datum](val datum: Datum, val subsampleWeights: Array[Double]) - extends Serializable +private[spark] class BaggedPoint[Datum]( + val datum: Datum, + val subsampleCounts: Array[Int], + val sampleWeight: Double) extends Serializable private[spark] object BaggedPoint { @@ -52,6 +52,7 @@ private[spark] object BaggedPoint { * @param subsamplingRate Fraction of the training data used for learning decision tree. * @param numSubsamples Number of subsamples of this RDD to take. * @param withReplacement Sampling with/without replacement. + * @param extractSampleWeight A function to get the sample weight of each point. * @param seed Random seed. * @return BaggedPoint dataset representation. */ @@ -60,20 +61,24 @@ private[spark] object BaggedPoint { subsamplingRate: Double, numSubsamples: Int, withReplacement: Boolean, + extractSampleWeight: (Datum => Double) = (_: Datum) => 1.0, seed: Long = Utils.random.nextLong()): RDD[BaggedPoint[Datum]] = { if (withReplacement) { - convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed) + convertToBaggedRDDSamplingWithReplacement(input, extractSampleWeight, subsamplingRate, + numSubsamples, seed) } else { if (numSubsamples == 1 && subsamplingRate == 1.0) { - convertToBaggedRDDWithoutSampling(input) + convertToBaggedRDDWithoutSampling(input, extractSampleWeight) } else { - convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed) + convertToBaggedRDDSamplingWithoutReplacement(input, extractSampleWeight, subsamplingRate, + numSubsamples, seed) } } } private def convertToBaggedRDDSamplingWithoutReplacement[Datum] ( input: RDD[Datum], + extractSampleWeight: (Datum => Double), subsamplingRate: Double, numSubsamples: Int, seed: Long): RDD[BaggedPoint[Datum]] = { @@ -82,22 +87,23 @@ private[spark] object BaggedPoint { val rng = new XORShiftRandom rng.setSeed(seed + partitionIndex + 1) instances.map { instance => - val subsampleWeights = new Array[Double](numSubsamples) + val subsampleCounts = new Array[Int](numSubsamples) var subsampleIndex = 0 while (subsampleIndex < numSubsamples) { val x = rng.nextDouble() - subsampleWeights(subsampleIndex) = { - if (x < subsamplingRate) 1.0 else 0.0 + subsampleCounts(subsampleIndex) = { + if (x < subsamplingRate) 1 else 0 } subsampleIndex += 1 } - new BaggedPoint(instance, subsampleWeights) + new BaggedPoint(instance, subsampleCounts, extractSampleWeight(instance)) } } } private def convertToBaggedRDDSamplingWithReplacement[Datum] ( input: RDD[Datum], + extractSampleWeight: (Datum => Double), subsample: Double, numSubsamples: Int, seed: Long): RDD[BaggedPoint[Datum]] = { @@ -106,20 +112,20 @@ private[spark] object BaggedPoint { val poisson = new PoissonDistribution(subsample) poisson.reseedRandomGenerator(seed + partitionIndex + 1) instances.map { instance => - val subsampleWeights = new Array[Double](numSubsamples) + val subsampleCounts = new Array[Int](numSubsamples) var subsampleIndex = 0 while (subsampleIndex < numSubsamples) { - subsampleWeights(subsampleIndex) = poisson.sample() + subsampleCounts(subsampleIndex) = poisson.sample() subsampleIndex += 1 } - new BaggedPoint(instance, subsampleWeights) + new BaggedPoint(instance, subsampleCounts, extractSampleWeight(instance)) } } } private def convertToBaggedRDDWithoutSampling[Datum] ( - input: RDD[Datum]): RDD[BaggedPoint[Datum]] = { - input.map(datum => new BaggedPoint(datum, Array(1.0))) + input: RDD[Datum], + extractSampleWeight: (Datum => Double)): RDD[BaggedPoint[Datum]] = { + input.map(datum => new BaggedPoint(datum, Array(1), extractSampleWeight(datum))) } - } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 7b1fd089f2943..db3e2f1986bf6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -125,7 +125,7 @@ private[spark] object RandomForest extends Logging { val withReplacement = numTrees > 1 val baggedInput = BaggedPoint - .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed) + .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed = seed) .persist(StorageLevel.MEMORY_AND_DISK) // depth of the decision tree @@ -407,7 +407,7 @@ private[spark] object RandomForest extends Logging { if (nodeInfo != null) { val aggNodeIndex = nodeInfo.nodeIndexInGroup val featuresForNode = nodeInfo.featureSubset - val instanceWeight = baggedPoint.subsampleWeights(treeIndex) + val instanceWeight = baggedPoint.subsampleCounts(treeIndex) if (metadata.unorderedFeatures.isEmpty) { orderedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, instanceWeight, featuresForNode) } else { diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala index 77ab3d8bb75f7..4593e2b0b760d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala @@ -29,9 +29,9 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { test("BaggedPoint RDD: without subsampling") { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, 42) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, 1, false, seed = 42) baggedRDD.collect().foreach { baggedPoint => - assert(baggedPoint.subsampleWeights.size == 1 && baggedPoint.subsampleWeights(0) == 1) + assert(baggedPoint.subsampleCounts.size == 1 && baggedPoint.subsampleCounts(0) == 1) } } @@ -43,8 +43,9 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed => - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true, seed) - val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, true, seed = seed) + val subsampleCounts: Array[Array[Double]] = + baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) } @@ -59,8 +60,10 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed => - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true, seed) - val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() + val baggedRDD = + BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, true, seed = seed) + val subsampleCounts: Array[Array[Double]] = + baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) } @@ -74,8 +77,9 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed => - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false, seed) - val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, 1.0, numSubsamples, false, seed = seed) + val subsampleCounts: Array[Array[Double]] = + baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) } @@ -90,8 +94,10 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed => - val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, seed) - val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleWeights).collect() + val baggedRDD = + BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, seed = seed) + val subsampleCounts: Array[Array[Double]] = + baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) } From 6f6c2a178de2c29d6ddfa8e3d57e6684d2401179 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 13 Apr 2016 14:42:51 -0700 Subject: [PATCH 2/3] added a test --- .../spark/ml/tree/impl/BaggedPoint.scala | 9 ++++++- .../spark/ml/tree/impl/BaggedPointSuite.scala | 26 +++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala index c6f6106bd4d42..cad73fe6cf9f5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala @@ -39,7 +39,14 @@ import org.apache.spark.util.random.XORShiftRandom private[spark] class BaggedPoint[Datum]( val datum: Datum, val subsampleCounts: Array[Int], - val sampleWeight: Double) extends Serializable + val sampleWeight: Double) extends Serializable { + + /** + * Subsample counts weighted by the sample weight. + */ + def weightedCounts: Array[Double] = subsampleCounts.map(_ * sampleWeight) + +} private[spark] object BaggedPoint { diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala index 4593e2b0b760d..603e80b94936b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.EnsembleTestHelper import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -94,12 +95,33 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, 1000) val rdd = sc.parallelize(arr) seeds.foreach { seed => - val baggedRDD = - BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, seed = seed) + val baggedRDD = BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, + seed = seed) val subsampleCounts: Array[Array[Double]] = baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) } } + + test("BaggedPoint RDD: with subsampling without replacement (fraction = 0.5) with weights") { + val numSubsamples = 100 + val subsample = 0.5 + val constantWeight = 0.1 + val expectedMean = subsample * constantWeight + val expectedStddev = math.sqrt(subsample * (1 - subsample)) * constantWeight + val seeds = Array(123, 5354, 230, 349867, 23987) + val numSamples = 1000 + val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, numSamples) + val rdd = sc.parallelize(arr) + seeds.foreach { seed => + val baggedRDD = + BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, + (lp: LabeledPoint) => constantWeight, seed = seed) + val subsampleCounts: Array[Array[Double]] = + baggedRDD.map(point => point.weightedCounts).collect() + EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, + expectedStddev, epsilon = 0.01) + } + } } From a673658c4d2f6121b7854be80ade481ae2c18ddb Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 13 Apr 2016 14:49:20 -0700 Subject: [PATCH 3/3] removing test and style --- .../spark/ml/tree/impl/RandomForest.scala | 3 ++- .../spark/ml/tree/impl/BaggedPointSuite.scala | 22 ------------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index db3e2f1986bf6..d0cb880625fde 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -125,7 +125,8 @@ private[spark] object RandomForest extends Logging { val withReplacement = numTrees > 1 val baggedInput = BaggedPoint - .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed = seed) + .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, + seed = seed) .persist(StorageLevel.MEMORY_AND_DISK) // depth of the decision tree diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala index 603e80b94936b..ec20148828afc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.EnsembleTestHelper import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -103,25 +102,4 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { expectedStddev, epsilon = 0.01) } } - - test("BaggedPoint RDD: with subsampling without replacement (fraction = 0.5) with weights") { - val numSubsamples = 100 - val subsample = 0.5 - val constantWeight = 0.1 - val expectedMean = subsample * constantWeight - val expectedStddev = math.sqrt(subsample * (1 - subsample)) * constantWeight - val seeds = Array(123, 5354, 230, 349867, 23987) - val numSamples = 1000 - val arr = EnsembleTestHelper.generateOrderedLabeledPoints(1, numSamples) - val rdd = sc.parallelize(arr) - seeds.foreach { seed => - val baggedRDD = - BaggedPoint.convertToBaggedRDD(rdd, subsample, numSubsamples, false, - (lp: LabeledPoint) => constantWeight, seed = seed) - val subsampleCounts: Array[Array[Double]] = - baggedRDD.map(point => point.weightedCounts).collect() - EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, - expectedStddev, epsilon = 0.01) - } - } }