From a751ec6863cdc17094386d2d8ca8171f077497af Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 5 Feb 2014 15:16:54 -0800 Subject: [PATCH 01/21] Add k-fold cross validation to MLLib --- .../org/apache/spark/rdd/FoldedRDD.scala | 71 +++++++++++++++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 11 +++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 17 +++++ .../org/apache/spark/mllib/util/MLUtils.scala | 33 +++++++++ .../spark/mllib/util/MLUtilsSuite.scala | 61 ++++++++++++++++ 5 files changed, 193 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala new file mode 100644 index 0000000000000..4aefbad49e8a6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag +import java.util.Random + +import cern.jet.random.Poisson +import cern.jet.random.engine.DRand + +import org.apache.spark.{Partition, TaskContext} + +private[spark] +class FoldedRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { + override val index: Int = prev.index +} + +class FoldedRDD[T: ClassTag]( + prev: RDD[T], + fold: Int, + folds: Int, + seed: Int) + extends RDD[T](prev) { + + override def getPartitions: Array[Partition] = { + val rg = new Random(seed) + firstParent[T].partitions.map(x => new FoldedRDDPartition(x, rg.nextInt)) + } + + override def getPreferredLocations(split: Partition): Seq[String] = + firstParent[T].preferredLocations(split.asInstanceOf[FoldedRDDPartition].prev) + + override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { + val split = splitIn.asInstanceOf[FoldedRDDPartition] + val rand = new Random(split.seed) + firstParent[T].iterator(split.prev, context).filter(x => (rand.nextInt(folds) == fold-1)) + } +} + +/** + * A companion class to FoldedRDD which contains all of the elements not in the fold for the same + * fold/seed combination. Useful for cross validation + */ +class CompositeFoldedRDD[T: ClassTag]( + prev: RDD[T], + fold: Int, + folds: Int, + seed: Int) + extends FoldedRDD[T](prev, fold, folds, seed) { + + override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { + val split = splitIn.asInstanceOf[FoldedRDDPartition] + val rand = new Random(split.seed) + firstParent[T].iterator(split.prev, context).filter(x => (rand.nextInt(folds) != fold-1)) + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf3c57ad41eb2..b5197cebba022 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -334,6 +334,17 @@ abstract class RDD[T: ClassTag]( }.toArray } + /** + * Return a k element list of pairs of RDDs with the first element of each pair + * containing a unique 1/Kth of the data and the second element contain the composite of that. + */ + def kFoldRdds(folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + 1.to(folds).map(fold => (( + new FoldedRDD(this, fold, folds, seed), + new CompositeFoldedRDD(this, fold, folds, seed) + ))).toList + } + def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 25973348a7837..28820f0eb6266 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -513,6 +513,23 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("kfoldRdd") { + val data = sc.parallelize(1 to 100, 2) + for (folds <- 1 to 10) { + for (seed <- 1 to 5) { + val foldedRdds = data.kFoldRdds(folds, seed) + assert(foldedRdds.size === folds) + foldedRdds.map{case (test, train) => + assert(test.union(train).collect().sorted === data.collect().sorted, + "Each training+test set combined contains all of the data") + } + // K fold cross validation should only have each element in the test set exactly once + assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === + data.collect().sorted) + } + } + } + test("runJob on an invalid partition") { intercept[IllegalArgumentException] { sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index cb85e433bfc73..cad257290542b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.regression.RegressionModel /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -171,6 +172,38 @@ object MLUtils { dataStr.saveAsTextFile(dir) } + def meanSquaredError(a: Double, b: Double): Double = { + (a-b)*(a-b) + } + + /** + * Function to perform cross validation on a single learner. + * + * @param data - input data set + * @param folds - the number of folds (must be > 1) + * @param learner - function to produce a model + * @param errorFunction - function to compute the error of a given point + * + * @return the average error on the cross validated data. + */ + def crossValidate(data: RDD[LabeledPoint], folds: Int, seed: Int, + learner: (RDD[LabeledPoint] => RegressionModel), + errorFunction: ((Double,Double) => Double) = meanSquaredError): Double = { + if (folds <= 1) { + throw new IllegalArgumentException("Cross validation requires more than one fold") + } + val rdds = data.kFoldRdds(folds, seed) + val errorRates = rdds.map{case (testData, trainingData) => + val model = learner(trainingData) + val predictions = model.predict(testData.map(_.features)) + val errors = predictions.zip(testData.map(_.label)).map{case (x,y) => errorFunction(x,y)} + errors.sum() + } + val averageError = errorRates.sum / data.count + averageError + } + + /** * Utility function to compute mean and standard deviation on a given dataset. * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 27d41c7869aa0..5ea55bce47908 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -32,6 +32,18 @@ import org.apache.spark.mllib.util.MLUtils._ class MLUtilsSuite extends FunSuite with LocalSparkContext { + @transient private var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + } + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") @@ -106,4 +118,53 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { case t: Throwable => } } + + // This learner always says everything is 0 + def terribleLearner(trainingData: RDD[LabeledPoint]): RegressionModel = { + object AlwaysZero extends RegressionModel { + override def predict(testData: RDD[Array[Double]]): RDD[Double] = { + testData.map(_ => 0) + } + override def predict(testData: Array[Double]): Double = { + 0 + } + } + AlwaysZero + } + + // Always returns its input + def exactLearner(trainingData: RDD[LabeledPoint]): RegressionModel = { + new LinearRegressionModel(Array(1.0), 0) + } + + test("Test cross validation with a terrible learner") { + val data = sc.parallelize(1.to(100).zip(1.to(100))).map( + x => LabeledPoint(x._1, Array(x._2))) + val expectedError = 1.to(100).map(x => x*x).sum / 100.0 + for (seed <- 1 to 5) { + for (folds <- 2 to 5) { + val avgError = MLUtils.crossValidate(data, folds, seed, terribleLearner) + avgError should equal (expectedError) + } + } + } + test("Test cross validation with a reasonable learner") { + val data = sc.parallelize(1.to(100).zip(1.to(100))).map( + x => LabeledPoint(x._1, Array(x._2))) + for (seed <- 1 to 5) { + for (folds <- 2 to 5) { + val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner) + avgError should equal (0) + } + } + } + + test("Cross validation requires more than one fold") { + val data = sc.parallelize(1.to(100).zip(1.to(100))).map( + x => LabeledPoint(x._1, Array(x._2))) + val thrown = intercept[java.lang.IllegalArgumentException] { + val avgError = MLUtils.crossValidate(data, 1, 1, terribleLearner) + } + assert(thrown.getClass === classOf[IllegalArgumentException]) + } } From 08f8e4d930e1d9f68f06783dd87f9076bb6fb1a7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Feb 2014 11:05:54 -0800 Subject: [PATCH 02/21] Fix BernoulliSampler to respect complement --- .../main/scala/org/apache/spark/util/random/RandomSampler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 0f1fca4813ba9..850c94bef9ea8 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -63,7 +63,7 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) } } - override def clone = new BernoulliSampler[T](lb, ub) + override def clone = new BernoulliSampler[T](lb, ub, complement) } /** From c0b7fa4d06dec185cc1695121411c709b904967c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Feb 2014 11:06:33 -0800 Subject: [PATCH 03/21] Switch FoldedRDD to use BernoulliSampler and PartitionwiseSampledRDD --- .../org/apache/spark/rdd/FoldedRDD.scala | 33 ++++--------------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 ++++++++-- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala index 4aefbad49e8a6..b808f75cb0ba8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala @@ -24,6 +24,7 @@ import cern.jet.random.Poisson import cern.jet.random.engine.DRand import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.util.random.BernoulliSampler private[spark] class FoldedRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { @@ -32,24 +33,10 @@ class FoldedRDDPartition(val prev: Partition, val seed: Int) extends Partition w class FoldedRDD[T: ClassTag]( prev: RDD[T], - fold: Int, - folds: Int, + fold: Float, + folds: Float, seed: Int) - extends RDD[T](prev) { - - override def getPartitions: Array[Partition] = { - val rg = new Random(seed) - firstParent[T].partitions.map(x => new FoldedRDDPartition(x, rg.nextInt)) - } - - override def getPreferredLocations(split: Partition): Seq[String] = - firstParent[T].preferredLocations(split.asInstanceOf[FoldedRDDPartition].prev) - - override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { - val split = splitIn.asInstanceOf[FoldedRDDPartition] - val rand = new Random(split.seed) - firstParent[T].iterator(split.prev, context).filter(x => (rand.nextInt(folds) == fold-1)) - } + extends PartitionwiseSampledRDD[T, T](prev, new BernoulliSampler((fold-1)/folds,fold/folds, false), seed) { } /** @@ -58,14 +45,8 @@ class FoldedRDD[T: ClassTag]( */ class CompositeFoldedRDD[T: ClassTag]( prev: RDD[T], - fold: Int, - folds: Int, + fold: Float, + folds: Float, seed: Int) - extends FoldedRDD[T](prev, fold, folds, seed) { - - override def compute(splitIn: Partition, context: TaskContext): Iterator[T] = { - val split = splitIn.asInstanceOf[FoldedRDDPartition] - val rand = new Random(split.seed) - firstParent[T].iterator(split.prev, context).filter(x => (rand.nextInt(folds) != fold-1)) - } + extends PartitionwiseSampledRDD[T, T](prev, new BernoulliSampler((fold-1)/folds, fold/folds, true), seed) { } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 28820f0eb6266..9466c94d2208a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -513,14 +513,28 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } + test("FoldedRDD") { + val data = sc.parallelize(1 to 100, 2) + val lowerFoldedRdd = new FoldedRDD(data, 1, 2, 1) + val upperFoldedRdd = new FoldedRDD(data, 2, 2, 1) + val lowerCompositeFoldedRdd = new CompositeFoldedRDD(data, 1, 2, 1) + assert(lowerFoldedRdd.collect().sorted.size == 50) + assert(lowerCompositeFoldedRdd.collect().sorted.size == 50) + assert(lowerFoldedRdd.subtract(lowerCompositeFoldedRdd).collect().sorted === + lowerFoldedRdd.collect().sorted) + assert(upperFoldedRdd.collect().sorted.size == 50) + } + test("kfoldRdd") { val data = sc.parallelize(1 to 100, 2) - for (folds <- 1 to 10) { + val collectedData = data.collect().sorted + for (folds <- 2 to 10) { for (seed <- 1 to 5) { val foldedRdds = data.kFoldRdds(folds, seed) assert(foldedRdds.size === folds) foldedRdds.map{case (test, train) => - assert(test.union(train).collect().sorted === data.collect().sorted, + val result = test.union(train).collect().sorted + assert(result === collectedData, "Each training+test set combined contains all of the data") } // K fold cross validation should only have each element in the test set exactly once From dd0b7373546d7571e8b0e0c85299bf94232cc461 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Feb 2014 11:28:47 -0800 Subject: [PATCH 04/21] Wrap long lines (oops) --- core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala index b808f75cb0ba8..050ae4b447ac9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala @@ -36,7 +36,8 @@ class FoldedRDD[T: ClassTag]( fold: Float, folds: Float, seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, new BernoulliSampler((fold-1)/folds,fold/folds, false), seed) { + extends PartitionwiseSampledRDD[T, T](prev, + new BernoulliSampler((fold-1)/folds,fold/folds, false), seed) { } /** @@ -48,5 +49,6 @@ class CompositeFoldedRDD[T: ClassTag]( fold: Float, folds: Float, seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, new BernoulliSampler((fold-1)/folds, fold/folds, true), seed) { + extends PartitionwiseSampledRDD[T, T](prev, + new BernoulliSampler((fold-1)/folds, fold/folds, true), seed) { } From 264502a2e953bf6755ca4b8651176a7281a682e1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 11 Feb 2014 11:38:58 -0800 Subject: [PATCH 05/21] Add a test for the bug that was found with BernoulliSampler not copying the complement param --- .../spark/util/random/RandomSamplerSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index 7576c9a51f313..71874d16ade67 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -48,6 +48,20 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } + test("BernoulliSamplerWithRangeInverse") { + expecting { + for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { + random.nextDouble().andReturn(x) + } + } + whenExecuting(random) + { + val sampler = new BernoulliSampler[Int](0.25, 0.55, true)(random) + assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) + } + } + + test("BernoulliSamplerWithRatio") { expecting { for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { From 91eae64dc0731c845b03f17d052e26214fd9d28f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 12 Feb 2014 12:11:19 -0800 Subject: [PATCH 06/21] Consolidate things in mlutils --- .../org/apache/spark/rdd/FoldedRDD.scala | 54 ------------------- .../main/scala/org/apache/spark/rdd/RDD.scala | 11 ---- .../scala/org/apache/spark/rdd/RDDSuite.scala | 31 ----------- .../org/apache/spark/mllib/util/MLUtils.scala | 32 +++++++++-- .../spark/mllib/util/MLUtilsSuite.scala | 41 +++++++++----- 5 files changed, 56 insertions(+), 113 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala deleted file mode 100644 index 050ae4b447ac9..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/FoldedRDD.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag -import java.util.Random - -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand - -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.util.random.BernoulliSampler - -private[spark] -class FoldedRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable { - override val index: Int = prev.index -} - -class FoldedRDD[T: ClassTag]( - prev: RDD[T], - fold: Float, - folds: Float, - seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, - new BernoulliSampler((fold-1)/folds,fold/folds, false), seed) { -} - -/** - * A companion class to FoldedRDD which contains all of the elements not in the fold for the same - * fold/seed combination. Useful for cross validation - */ -class CompositeFoldedRDD[T: ClassTag]( - prev: RDD[T], - fold: Float, - folds: Float, - seed: Int) - extends PartitionwiseSampledRDD[T, T](prev, - new BernoulliSampler((fold-1)/folds, fold/folds, true), seed) { -} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b5197cebba022..bf3c57ad41eb2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -334,17 +334,6 @@ abstract class RDD[T: ClassTag]( }.toArray } - /** - * Return a k element list of pairs of RDDs with the first element of each pair - * containing a unique 1/Kth of the data and the second element contain the composite of that. - */ - def kFoldRdds(folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { - 1.to(folds).map(fold => (( - new FoldedRDD(this, fold, folds, seed), - new CompositeFoldedRDD(this, fold, folds, seed) - ))).toList - } - def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 9466c94d2208a..25973348a7837 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -513,37 +513,6 @@ class RDDSuite extends FunSuite with SharedSparkContext { } } - test("FoldedRDD") { - val data = sc.parallelize(1 to 100, 2) - val lowerFoldedRdd = new FoldedRDD(data, 1, 2, 1) - val upperFoldedRdd = new FoldedRDD(data, 2, 2, 1) - val lowerCompositeFoldedRdd = new CompositeFoldedRDD(data, 1, 2, 1) - assert(lowerFoldedRdd.collect().sorted.size == 50) - assert(lowerCompositeFoldedRdd.collect().sorted.size == 50) - assert(lowerFoldedRdd.subtract(lowerCompositeFoldedRdd).collect().sorted === - lowerFoldedRdd.collect().sorted) - assert(upperFoldedRdd.collect().sorted.size == 50) - } - - test("kfoldRdd") { - val data = sc.parallelize(1 to 100, 2) - val collectedData = data.collect().sorted - for (folds <- 2 to 10) { - for (seed <- 1 to 5) { - val foldedRdds = data.kFoldRdds(folds, seed) - assert(foldedRdds.size === folds) - foldedRdds.map{case (test, train) => - val result = test.union(train).collect().sorted - assert(result === collectedData, - "Each training+test set combined contains all of the data") - } - // K fold cross validation should only have each element in the test set exactly once - assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === - data.collect().sorted) - } - } - } - test("runJob on an invalid partition") { intercept[IllegalArgumentException] { sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index cad257290542b..22f2ad6398617 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -22,6 +22,15 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import scala.reflect._ + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.PartitionwiseSampledRDD +import org.apache.spark.SparkContext._ +import org.apache.spark.util.random.BernoulliSampler + +import org.jblas.DoubleMatrix import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.RegressionModel @@ -176,6 +185,21 @@ object MLUtils { (a-b)*(a-b) } + /** + * Return a k element list of pairs of RDDs with the first element of each pair + * containing a unique 1/Kth of the data and the second element contain the composite of that. + */ + def kFoldRdds[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + val foldsF = folds.toFloat + 1.to(folds).map(fold => (( + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, false), + seed), + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, true), + seed) + ))).toList + } + + /** * Function to perform cross validation on a single learner. * @@ -192,14 +216,14 @@ object MLUtils { if (folds <= 1) { throw new IllegalArgumentException("Cross validation requires more than one fold") } - val rdds = data.kFoldRdds(folds, seed) + val rdds = kFoldRdds(data, folds, seed) val errorRates = rdds.map{case (testData, trainingData) => val model = learner(trainingData) - val predictions = model.predict(testData.map(_.features)) - val errors = predictions.zip(testData.map(_.label)).map{case (x,y) => errorFunction(x,y)} + val predictions = testData.map(data => (data.label, model.predict(data.features))) + val errors = predictions.map{case (x, y) => errorFunction(x, y)} errors.sum() } - val averageError = errorRates.sum / data.count + val averageError = errorRates.sum / data.count.toFloat averageError } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 5ea55bce47908..bea65984f47f0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -43,7 +43,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { System.clearProperty("spark.driver.port") } - test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") @@ -137,20 +136,11 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { new LinearRegressionModel(Array(1.0), 0) } - test("Test cross validation with a terrible learner") { - val data = sc.parallelize(1.to(100).zip(1.to(100))).map( - x => LabeledPoint(x._1, Array(x._2))) - val expectedError = 1.to(100).map(x => x*x).sum / 100.0 - for (seed <- 1 to 5) { - for (folds <- 2 to 5) { - val avgError = MLUtils.crossValidate(data, folds, seed, terribleLearner) - avgError should equal (expectedError) - } - } - } test("Test cross validation with a reasonable learner") { val data = sc.parallelize(1.to(100).zip(1.to(100))).map( x => LabeledPoint(x._1, Array(x._2))) + val features = data.map(_.features) + val labels = data.map(_.label) for (seed <- 1 to 5) { for (folds <- 2 to 5) { val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner) @@ -163,8 +153,33 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { val data = sc.parallelize(1.to(100).zip(1.to(100))).map( x => LabeledPoint(x._1, Array(x._2))) val thrown = intercept[java.lang.IllegalArgumentException] { - val avgError = MLUtils.crossValidate(data, 1, 1, terribleLearner) + val avgError = MLUtils.crossValidate(data, 1, 1, exactLearner) } assert(thrown.getClass === classOf[IllegalArgumentException]) } + + test("kfoldRdd") { + val data = sc.parallelize(1 to 100, 2) + val collectedData = data.collect().sorted + val twoFoldedRdd = MLUtils.kFoldRdds(data, 2, 1) + assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted) + assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted) + for (folds <- 2 to 10) { + for (seed <- 1 to 5) { + val foldedRdds = MLUtils.kFoldRdds(data, folds, seed) + assert(foldedRdds.size === folds) + foldedRdds.map{case (test, train) => + val result = test.union(train).collect().sorted + assert(test.collect().size > 0, "Non empty test data") + assert(train.collect().size > 0, "Non empty training data") + assert(result === collectedData, + "Each training+test set combined contains all of the data") + } + // K fold cross validation should only have each element in the test set exactly once + assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === + data.collect().sorted) + } + } + } + } From b78804e54b0c350c54805b12057a3127255412f3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 12 Feb 2014 12:13:16 -0800 Subject: [PATCH 07/21] Remove cross validation [TODO in another pull request] --- .../org/apache/spark/mllib/util/MLUtils.scala | 29 ------------------- .../spark/mllib/util/MLUtilsSuite.scala | 22 -------------- 2 files changed, 51 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 22f2ad6398617..7617d0adbf0e0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -199,35 +199,6 @@ object MLUtils { ))).toList } - - /** - * Function to perform cross validation on a single learner. - * - * @param data - input data set - * @param folds - the number of folds (must be > 1) - * @param learner - function to produce a model - * @param errorFunction - function to compute the error of a given point - * - * @return the average error on the cross validated data. - */ - def crossValidate(data: RDD[LabeledPoint], folds: Int, seed: Int, - learner: (RDD[LabeledPoint] => RegressionModel), - errorFunction: ((Double,Double) => Double) = meanSquaredError): Double = { - if (folds <= 1) { - throw new IllegalArgumentException("Cross validation requires more than one fold") - } - val rdds = kFoldRdds(data, folds, seed) - val errorRates = rdds.map{case (testData, trainingData) => - val model = learner(trainingData) - val predictions = testData.map(data => (data.label, model.predict(data.features))) - val errors = predictions.map{case (x, y) => errorFunction(x, y)} - errors.sum() - } - val averageError = errorRates.sum / data.count.toFloat - averageError - } - - /** * Utility function to compute mean and standard deviation on a given dataset. * diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index bea65984f47f0..a3dd766d78fcc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -136,28 +136,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { new LinearRegressionModel(Array(1.0), 0) } - test("Test cross validation with a reasonable learner") { - val data = sc.parallelize(1.to(100).zip(1.to(100))).map( - x => LabeledPoint(x._1, Array(x._2))) - val features = data.map(_.features) - val labels = data.map(_.label) - for (seed <- 1 to 5) { - for (folds <- 2 to 5) { - val avgError = MLUtils.crossValidate(data, folds, seed, exactLearner) - avgError should equal (0) - } - } - } - - test("Cross validation requires more than one fold") { - val data = sc.parallelize(1.to(100).zip(1.to(100))).map( - x => LabeledPoint(x._1, Array(x._2))) - val thrown = intercept[java.lang.IllegalArgumentException] { - val avgError = MLUtils.crossValidate(data, 1, 1, exactLearner) - } - assert(thrown.getClass === classOf[IllegalArgumentException]) - } - test("kfoldRdd") { val data = sc.parallelize(1 to 100, 2) val collectedData = data.collect().sorted From e8741a75726e6dddd38c2602a3a19dbab9d503ef Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 16 Feb 2014 18:17:51 -0800 Subject: [PATCH 08/21] CR feedback --- .../org/apache/spark/mllib/util/MLUtils.scala | 18 +++++++--------- .../spark/mllib/util/MLUtilsSuite.scala | 21 +++++++++++++++---- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 7617d0adbf0e0..ef2b25a39b81e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -22,7 +22,7 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import scala.reflect._ +import scala.reflect.ClassTag import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -181,21 +181,17 @@ object MLUtils { dataStr.saveAsTextFile(dir) } - def meanSquaredError(a: Double, b: Double): Double = { - (a-b)*(a-b) - } - /** * Return a k element list of pairs of RDDs with the first element of each pair - * containing a unique 1/Kth of the data and the second element contain the composite of that. + * containing a unique 1/Kth of the data and the second element contain the compliment of that. */ - def kFoldRdds[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + def kFold[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { val foldsF = folds.toFloat 1.to(folds).map(fold => (( - new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, false), - seed), - new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, true), - seed) + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, + complement = false), seed), + new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, + complement = true), seed) ))).toList } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index a3dd766d78fcc..b3cef6d2a6294 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.mllib.util import java.io.File +import scala.math +import scala.util.Random import org.scalatest.FunSuite @@ -136,19 +138,30 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { new LinearRegressionModel(Array(1.0), 0) } - test("kfoldRdd") { + test("kFold") { val data = sc.parallelize(1 to 100, 2) val collectedData = data.collect().sorted - val twoFoldedRdd = MLUtils.kFoldRdds(data, 2, 1) + val twoFoldedRdd = MLUtils.kFold(data, 2, 1) assert(twoFoldedRdd(0)._1.collect().sorted === twoFoldedRdd(1)._2.collect().sorted) assert(twoFoldedRdd(0)._2.collect().sorted === twoFoldedRdd(1)._1.collect().sorted) for (folds <- 2 to 10) { for (seed <- 1 to 5) { - val foldedRdds = MLUtils.kFoldRdds(data, folds, seed) + val foldedRdds = MLUtils.kFold(data, folds, seed) assert(foldedRdds.size === folds) foldedRdds.map{case (test, train) => val result = test.union(train).collect().sorted - assert(test.collect().size > 0, "Non empty test data") + val testSize = test.collect().size.toFloat + assert(testSize > 0, "Non empty test data") + val p = 1 / folds.toFloat + // Within 3 standard deviations of the mean + val range = 3 * math.sqrt(100 * p * (1-p)) + val expected = 100 * p + val lowerBound = expected - range + val upperBound = expected + range + assert(testSize > lowerBound, + "Test data (" + testSize + ") smaller than expected (" + lowerBound +")" ) + assert(testSize < upperBound, + "Test data (" + testSize + ") larger than expected (" + upperBound +")" ) assert(train.collect().size > 0, "Non empty training data") assert(result === collectedData, "Each training+test set combined contains all of the data") From 5a33f1d6d91e1ec1afb62c920abc0443f13725cf Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 23 Feb 2014 17:33:11 -0800 Subject: [PATCH 09/21] Code review follow up. --- .../org/apache/spark/util/random/RandomSampler.scala | 2 ++ .../scala/org/apache/spark/mllib/util/MLUtils.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 850c94bef9ea8..deee3a9c6b2c5 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -63,6 +63,8 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) } } + def cloneComplement() = new BernoulliSampler[T](lb, ub, !complement) + override def clone = new BernoulliSampler[T](lb, ub, complement) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index ef2b25a39b81e..4fe021a33eabc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -187,12 +187,12 @@ object MLUtils { */ def kFold[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { val foldsF = folds.toFloat - 1.to(folds).map(fold => (( - new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, - complement = false), seed), - new PartitionwiseSampledRDD(rdd, new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, - complement = true), seed) - ))).toList + 1.to(folds).map { fold => + val sampler = new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, complement = false) + val train = new PartitionwiseSampledRDD(rdd, sampler, seed) + val test = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) + (train, test) + }.toList } /** From 163c5b1a97061515c3c3c93a68fd515b42ea5552 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 3 Mar 2014 19:18:14 -0800 Subject: [PATCH 10/21] code review feedback 1.to -> 1 to and folds -> numFolds --- .../main/scala/org/apache/spark/mllib/util/MLUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 4fe021a33eabc..d3b2170e68fae 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -185,10 +185,10 @@ object MLUtils { * Return a k element list of pairs of RDDs with the first element of each pair * containing a unique 1/Kth of the data and the second element contain the compliment of that. */ - def kFold[T : ClassTag](rdd: RDD[T], folds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { - val foldsF = folds.toFloat - 1.to(folds).map { fold => - val sampler = new BernoulliSampler[T]((fold-1)/foldsF,fold/foldsF, complement = false) + def kFold[T : ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + val numFoldsF = numFolds.toFloat + (1 to numFolds).map { fold => + val sampler = new BernoulliSampler[T]((fold-1)/numFoldsF,fold/numFoldsF, complement = false) val train = new PartitionwiseSampledRDD(rdd, sampler, seed) val test = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) (train, test) From bb5fa56b4f83c7143033368ac0e64ca5f7d9ead6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 18:44:40 -0700 Subject: [PATCH 11/21] extra line sadness --- .../scala/org/apache/spark/util/random/RandomSamplerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index 71874d16ade67..dbe398adc22b8 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -61,7 +61,6 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } } - test("BernoulliSamplerWithRatio") { expecting { for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) { From 7ebe4d571eee2ec11b6d30e9d76e73342a64e1e1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 8 Apr 2014 19:57:48 -0700 Subject: [PATCH 12/21] CR feedback, remove unecessary learners (came back during merge mistake) and insert an empty line --- .../spark/mllib/util/MLUtilsSuite.scala | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index b3cef6d2a6294..e5ede87aaae4a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.util import java.io.File + import scala.math import scala.util.Random @@ -120,24 +121,6 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { } } - // This learner always says everything is 0 - def terribleLearner(trainingData: RDD[LabeledPoint]): RegressionModel = { - object AlwaysZero extends RegressionModel { - override def predict(testData: RDD[Array[Double]]): RDD[Double] = { - testData.map(_ => 0) - } - override def predict(testData: Array[Double]): Double = { - 0 - } - } - AlwaysZero - } - - // Always returns its input - def exactLearner(trainingData: RDD[LabeledPoint]): RegressionModel = { - new LinearRegressionModel(Array(1.0), 0) - } - test("kFold") { val data = sc.parallelize(1 to 100, 2) val collectedData = data.collect().sorted From c5b723f4037d66b60d6110bd25800ad645bd5fca Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 11:07:08 -0700 Subject: [PATCH 13/21] clean up --- .../org/apache/spark/mllib/util/MLUtilsSuite.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index e5ede87aaae4a..ed4fb5c1e2eec 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -35,17 +35,6 @@ import org.apache.spark.mllib.util.MLUtils._ class MLUtilsSuite extends FunSuite with LocalSparkContext { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") From e187e3555070711dc68a7b20c032c60969519f83 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 13:48:51 -0700 Subject: [PATCH 14/21] Move { up to same line as whenExecuting(random) in RandomSamplerSuite.scala --- .../spark/util/random/RandomSamplerSuite.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index dbe398adc22b8..e166787f17544 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -41,8 +41,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar random.nextDouble().andReturn(x) } } - whenExecuting(random) - { + whenExecuting(random) { val sampler = new BernoulliSampler[Int](0.25, 0.55)(random) assert(sampler.sample(a.iterator).toList == List(3, 4, 5)) } @@ -54,8 +53,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar random.nextDouble().andReturn(x) } } - whenExecuting(random) - { + whenExecuting(random) { val sampler = new BernoulliSampler[Int](0.25, 0.55, true)(random) assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9)) } @@ -67,8 +65,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar random.nextDouble().andReturn(x) } } - whenExecuting(random) - { + whenExecuting(random) { val sampler = new BernoulliSampler[Int](0.35)(random) assert(sampler.sample(a.iterator).toList == List(1, 2, 3)) } @@ -80,8 +77,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar random.nextDouble().andReturn(x) } } - whenExecuting(random) - { + whenExecuting(random) { val sampler = new BernoulliSampler[Int](0.25, 0.55, true)(random) assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9)) } @@ -91,8 +87,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar expecting { random.setSeed(10L) } - whenExecuting(random) - { + whenExecuting(random) { val sampler = new BernoulliSampler[Int](0.2)(random) sampler.setSeed(10L) } From c702a96b77a4fb68c789f5fccf7b412f81f997f3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 13:50:33 -0700 Subject: [PATCH 15/21] Fix imports in MLUtils --- .../main/scala/org/apache/spark/mllib/util/MLUtils.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index d3b2170e68fae..d143b8b5cacde 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -17,13 +17,11 @@ package org.apache.spark.mllib.util +import scala.reflect.ClassTag + import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, squaredDistance => breezeSquaredDistance} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import scala.reflect.ClassTag - import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD @@ -33,7 +31,6 @@ import org.apache.spark.util.random.BernoulliSampler import org.jblas.DoubleMatrix import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.regression.RegressionModel /** * Helper methods to load, save and pre-process data used in ML Lib. From 2cb90b32090f3b9f52bfc15ab79c994ef63e670a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 13:55:12 -0700 Subject: [PATCH 16/21] Fix the names in kFold --- .../org/apache/spark/mllib/util/MLUtils.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index d143b8b5cacde..ad37a003d60ca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -180,16 +180,17 @@ object MLUtils { /** * Return a k element list of pairs of RDDs with the first element of each pair - * containing a unique 1/Kth of the data and the second element contain the compliment of that. + * containing the validation data, a unique 1/Kth of the data and the second + * element, the training data, contain the compliment of that. */ - def kFold[T : ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): List[Pair[RDD[T], RDD[T]]] = { + def kFold[T : ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold => - val sampler = new BernoulliSampler[T]((fold-1)/numFoldsF,fold/numFoldsF, complement = false) - val train = new PartitionwiseSampledRDD(rdd, sampler, seed) - val test = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) - (train, test) - }.toList + val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) + val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) + val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) + (validation, training) + }.toArray } /** From 150889c8c8a23a74ca63fbeaf09300de37712b0b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 13:57:17 -0700 Subject: [PATCH 17/21] Fix up error messages in the MLUtilsSuite --- .../org/apache/spark/mllib/util/MLUtilsSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index ed4fb5c1e2eec..92428ff6e44d8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -120,10 +120,10 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { for (seed <- 1 to 5) { val foldedRdds = MLUtils.kFold(data, folds, seed) assert(foldedRdds.size === folds) - foldedRdds.map{case (test, train) => + foldedRdds.map { case (test, train) => val result = test.union(train).collect().sorted val testSize = test.collect().size.toFloat - assert(testSize > 0, "Non empty test data") + assert(testSize > 0, "empty test data") val p = 1 / folds.toFloat // Within 3 standard deviations of the mean val range = 3 * math.sqrt(100 * p * (1-p)) @@ -131,12 +131,12 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { val lowerBound = expected - range val upperBound = expected + range assert(testSize > lowerBound, - "Test data (" + testSize + ") smaller than expected (" + lowerBound +")" ) + s"Test data ($testSize) smaller than expected ($lowerBound)" ) assert(testSize < upperBound, - "Test data (" + testSize + ") larger than expected (" + upperBound +")" ) - assert(train.collect().size > 0, "Non empty training data") + s"Test data ($testSize) larger than expected ($upperBound)" ) + assert(train.collect().size > 0, "empty training data") assert(result === collectedData, - "Each training+test set combined contains all of the data") + "Each training+test set combined should contain all of the data.") } // K fold cross validation should only have each element in the test set exactly once assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === From 90896c7eccb85bc15e1a4c275cf3502e4c71bf0a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 15:13:07 -0700 Subject: [PATCH 18/21] New line --- mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index ad37a003d60ca..0fbb606267f9b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -186,7 +186,8 @@ object MLUtils { def kFold[T : ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat (1 to numFolds).map { fold => - val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) + val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, + complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) (validation, training) From 7157ae9ffccf2eacf4e1e6c5ffc5ada0afe83e63 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 20:46:32 -0700 Subject: [PATCH 19/21] CR feedback --- .../spark/util/random/RandomSampler.scala | 5 +++- .../org/apache/spark/mllib/util/MLUtils.scala | 11 ++++----- .../spark/mllib/util/MLUtilsSuite.scala | 24 +++++++++---------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index deee3a9c6b2c5..fd941d2ebed8b 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -63,7 +63,10 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) } } - def cloneComplement() = new BernoulliSampler[T](lb, ub, !complement) + /** + * Return a sampler with is the complement of the range specified of the current sampler. + */ + def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement) override def clone = new BernoulliSampler[T](lb, ub, complement) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 0fbb606267f9b..7112ada039de1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -21,14 +21,13 @@ import scala.reflect.ClassTag import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, squaredDistance => breezeSquaredDistance} +import org.jblas.DoubleMatrix import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.SparkContext._ import org.apache.spark.util.random.BernoulliSampler - -import org.jblas.DoubleMatrix import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.{Vector, Vectors} @@ -179,13 +178,13 @@ object MLUtils { } /** - * Return a k element list of pairs of RDDs with the first element of each pair + * Return a k element array of pairs of RDDs with the first element of each pair * containing the validation data, a unique 1/Kth of the data and the second - * element, the training data, contain the compliment of that. + * element, the training data, contain the complement of that. */ - def kFold[T : ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { + def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat - (1 to numFolds).map { fold => + (1 to numFolds).map { fold => val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 92428ff6e44d8..1ce4948108a80 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -120,25 +120,25 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { for (seed <- 1 to 5) { val foldedRdds = MLUtils.kFold(data, folds, seed) assert(foldedRdds.size === folds) - foldedRdds.map { case (test, train) => - val result = test.union(train).collect().sorted - val testSize = test.collect().size.toFloat - assert(testSize > 0, "empty test data") + foldedRdds.map { case (validation, training) => + val result = validation.union(training).collect().sorted + val validationSize = validation.collect().size.toFloat + assert(validationSize > 0, "empty validation data") val p = 1 / folds.toFloat // Within 3 standard deviations of the mean - val range = 3 * math.sqrt(100 * p * (1-p)) + val range = 3 * math.sqrt(100 * p * (1 - p)) val expected = 100 * p val lowerBound = expected - range val upperBound = expected + range - assert(testSize > lowerBound, - s"Test data ($testSize) smaller than expected ($lowerBound)" ) - assert(testSize < upperBound, - s"Test data ($testSize) larger than expected ($upperBound)" ) - assert(train.collect().size > 0, "empty training data") + assert(validationSize > lowerBound, + s"Validation data ($validationSize) smaller than expected ($lowerBound)" ) + assert(validationSize < upperBound, + s"Validation data ($validationSize) larger than expected ($upperBound)" ) + assert(training.collect().size > 0, "empty training data") assert(result === collectedData, - "Each training+test set combined should contain all of the data.") + "Each training+validation set combined should contain all of the data.") } - // K fold cross validation should only have each element in the test set exactly once + // K fold cross validation should only have each element in the validation set exactly once assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === data.collect().sorted) } From 6ddbf05de0b0749cfc90f40e02a1864081aaa676 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 9 Apr 2014 20:58:10 -0700 Subject: [PATCH 20/21] swap training and validation order --- .../main/scala/org/apache/spark/mllib/util/MLUtils.scala | 6 +++--- .../scala/org/apache/spark/mllib/util/MLUtilsSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 7112ada039de1..94f892ccbd333 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -179,8 +179,8 @@ object MLUtils { /** * Return a k element array of pairs of RDDs with the first element of each pair - * containing the validation data, a unique 1/Kth of the data and the second - * element, the training data, contain the complement of that. + * containing the training data, a complement of the validation data and the second + * element, the validation data, containing a unique 1/kth of the data. Where k=numFolds. */ def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = { val numFoldsF = numFolds.toFloat @@ -189,7 +189,7 @@ object MLUtils { complement = false) val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) - (validation, training) + (training, validation) }.toArray } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 1ce4948108a80..be30aeafa62c7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -120,7 +120,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { for (seed <- 1 to 5) { val foldedRdds = MLUtils.kFold(data, folds, seed) assert(foldedRdds.size === folds) - foldedRdds.map { case (validation, training) => + foldedRdds.map { case (training, validation) => val result = validation.union(training).collect().sorted val validationSize = validation.collect().size.toFloat assert(validationSize > 0, "empty validation data") From e84f2fca08fd07511e107828e15896a93c80b10b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 10 Apr 2014 11:04:50 -0700 Subject: [PATCH 21/21] Fix the test, we should be looking at the second element instead --- .../test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index be30aeafa62c7..61c790f341cc8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -139,7 +139,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { "Each training+validation set combined should contain all of the data.") } // K fold cross validation should only have each element in the validation set exactly once - assert(foldedRdds.map(_._1).reduce((x,y) => x.union(y)).collect().sorted === + assert(foldedRdds.map(_._2).reduce((x,y) => x.union(y)).collect().sorted === data.collect().sorted) } }