From 2512e67ab84aa7a4f2e4c1a7ecae8870a3453c7c Mon Sep 17 00:00:00 2001 From: Andrew Tulloch Date: Sun, 19 Jan 2014 17:51:00 +0000 Subject: [PATCH 1/8] LocalSparkContext for MLlib --- .../LogisticRegressionSuite.scala | 15 ++---------- .../classification/NaiveBayesSuite.scala | 13 ++--------- .../spark/mllib/classification/SVMSuite.scala | 15 +++--------- .../spark/mllib/clustering/KMeansSuite.scala | 15 ++---------- .../optimization/GradientDescentSuite.scala | 13 ++--------- .../spark/mllib/recommendation/ALSSuite.scala | 14 ++--------- .../spark/mllib/regression/LassoSuite.scala | 16 ++----------- .../regression/LinearRegressionSuite.scala | 14 ++--------- .../regression/RidgeRegressionSuite.scala | 13 ++--------- .../spark/mllib/util/LocalSparkContext.scala | 23 +++++++++++++++++++ 10 files changed, 42 insertions(+), 109 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala index 02ede711372d..155788f85ef1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.classification +import org.apache.spark.mllib.util.LocalSparkContext import scala.util.Random import scala.collection.JavaConversions._ @@ -66,19 +67,7 @@ object LogisticRegressionSuite { } -class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } - +class LogisticRegressionSuite extends FunSuite with LocalSparkContext with ShouldMatchers { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => prediction != expected.label diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index b615f76e66cf..375196d97fd2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.classification +import org.apache.spark.mllib.util.LocalSparkContext import scala.util.Random import org.scalatest.BeforeAndAfterAll @@ -59,17 +60,7 @@ object NaiveBayesSuite { } } -class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class NaiveBayesSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { val numOfPredictions = predictions.zip(input).count { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index 3357b86f9b70..bc7abb568a17 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -25,8 +25,9 @@ import org.scalatest.FunSuite import org.jblas.DoubleMatrix -import org.apache.spark.{SparkException, SparkContext} +import org.apache.spark.SparkException import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.LocalSparkContext object SVMSuite { @@ -58,17 +59,7 @@ object SVMSuite { } -class SVMSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class SVMSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 73657cac893c..4ef1d1f64ff0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -21,20 +21,9 @@ package org.apache.spark.mllib.clustering import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.SparkContext +import org.apache.spark.mllib.util.LocalSparkContext - -class KMeansSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class KMeansSuite extends FunSuite with LocalSparkContext { val EPSILON = 1e-4 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala index a6028a1e981d..a453de6767aa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.LocalSparkContext object GradientDescentSuite { @@ -62,17 +63,7 @@ object GradientDescentSuite { } } -class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMatchers { test("Assert the loss is decreasing.") { val nPoints = 10000 diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 4e8dbde65801..28a27b1d09e6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.SparkContext +import org.apache.spark.mllib.util.LocalSparkContext import org.jblas._ @@ -73,17 +73,7 @@ object ALSSuite { } -class ALSSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class ALSSuite extends FunSuite with LocalSparkContext { test("rank-1 matrices") { testALS(50, 100, 1, 15, 0.7, 0.3) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala index b2c8df97a82a..64e4cbb860f6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala @@ -22,21 +22,9 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.SparkContext -import org.apache.spark.mllib.util.LinearDataGenerator +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} - -class LassoSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class LassoSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala index 406afbaa3e2c..648d89bc8c3e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -21,19 +21,9 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.SparkContext -import org.apache.spark.mllib.util.LinearDataGenerator +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} -class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class LinearRegressionSuite extends FunSuite with LocalSparkContext { def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala index 1d6a10b66e89..7ec53eb7722c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -23,19 +23,10 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.SparkContext -import org.apache.spark.mllib.util.LinearDataGenerator +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} -class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll { - @transient private var sc: SparkContext = _ - override def beforeAll() { - sc = new SparkContext("local", "test") - } - - override def afterAll() { - sc.stop() - System.clearProperty("spark.driver.port") - } +class RidgeRegressionSuite extends FunSuite with LocalSparkContext { def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = { predictions.zip(input).map { case (prediction, expected) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala new file mode 100644 index 000000000000..7d840043e5c6 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -0,0 +1,23 @@ +package org.apache.spark.mllib.util + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkContext + +trait LocalSparkContext extends BeforeAndAfterAll { self: Suite => + @transient var sc: SparkContext = _ + + override def beforeAll() { + sc = new SparkContext("local", "test") + super.beforeAll() + } + + override def afterAll() { + if (sc != null) { + sc.stop() + } + System.clearProperty("spark.driver.port") + super.afterAll() + } +} From 05649855853cb51c53aa8875306a8f80975e61ed Mon Sep 17 00:00:00 2001 From: Andrew Tulloch Date: Tue, 21 Jan 2014 13:34:59 +0000 Subject: [PATCH 2/8] Fixed import order --- .../spark/mllib/classification/LogisticRegressionSuite.scala | 2 +- .../apache/spark/mllib/classification/NaiveBayesSuite.scala | 3 +-- .../org/apache/spark/mllib/recommendation/ALSSuite.scala | 4 ++-- .../apache/spark/mllib/regression/LinearRegressionSuite.scala | 1 - .../apache/spark/mllib/regression/RidgeRegressionSuite.scala | 1 - 5 files changed, 4 insertions(+), 7 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala index 155788f85ef1..05322b024d5f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.classification -import org.apache.spark.mllib.util.LocalSparkContext import scala.util.Random import scala.collection.JavaConversions._ @@ -27,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers import org.apache.spark.SparkContext import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.LocalSparkContext object LogisticRegressionSuite { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala index 375196d97fd2..9dd6c79ee6ad 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.mllib.classification -import org.apache.spark.mllib.util.LocalSparkContext import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.SparkContext +import org.apache.spark.mllib.util.LocalSparkContext object NaiveBayesSuite { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 28a27b1d09e6..5dcec7dc3eb9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -23,10 +23,10 @@ import scala.util.Random import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.mllib.util.LocalSparkContext - import org.jblas._ +import org.apache.spark.mllib.util.LocalSparkContext + object ALSSuite { def generateRatingsAsJavaList( diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala index 648d89bc8c3e..281f9df36ddb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.mllib.regression import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.SparkContext import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} class LinearRegressionSuite extends FunSuite with LocalSparkContext { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala index 7ec53eb7722c..67dd06cc0f5e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala @@ -22,7 +22,6 @@ import org.jblas.DoubleMatrix import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite -import org.apache.spark.SparkContext import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} From a26ac906a7739262d67c9fd75d849f2b067b4287 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 27 Jan 2014 11:15:51 -0800 Subject: [PATCH 3/8] Merge pull request #460 from srowen/RandomInitialALSVectors Choose initial user/item vectors uniformly on the unit sphere ...rather than within the unit square to possibly avoid bias in the initial state and improve convergence. The current implementation picks the N vector elements uniformly at random from [0,1). This means they all point into one quadrant of the vector space. As N gets just a little large, the vector tend strongly to point into the "corner", towards (1,1,1...,1). The vectors are not unit vectors either. I suggest choosing the elements as Gaussian ~ N(0,1) and normalizing. This gets you uniform random choices on the unit sphere which is more what's of interest here. It has worked a little better for me in the past. This is pretty minor but wanted to warm up suggesting a few tweaks to ALS. Please excuse my Scala, pretty new to it. Author: Sean Owen == Merge branch commits == commit 492b13a7469e5a4ed7591ee8e56d8bd7570dfab6 Author: Sean Owen Date: Mon Jan 27 08:05:25 2014 +0000 Style: spaces around binary operators commit ce2b5b5a4fefa0356875701f668f01f02ba4d87e Author: Sean Owen Date: Sun Jan 19 22:50:03 2014 +0000 Generate factors with all positive components, per discussion in https://github.com/apache/incubator-spark/pull/460 commit b6f7a8a61643a8209e8bc662e8e81f2d15c710c7 Author: Sean Owen Date: Sat Jan 18 15:54:42 2014 +0000 Choose initial user/item vectors uniformly on the unit sphere rather than within the unit square to possibly avoid bias in the initial state and improve convergence --- .../org/apache/spark/mllib/recommendation/ALS.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 3e93402adffa..5c4388d72b97 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.recommendation import scala.collection.mutable.{ArrayBuffer, BitSet} +import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting @@ -301,7 +302,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l * Make a random factor vector with the given random. */ private def randomFactor(rank: Int, rand: Random): Array[Double] = { - Array.fill(rank)(rand.nextDouble) + // Choose a unit vector uniformly at random from the unit sphere, but from the + // "first quadrant" where all elements are nonnegative. This can be done by choosing + // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing. + // This appears to create factorizations that have a slightly better reconstruction + // (<1%) compared picking elements uniformly at random in [0,1]. + val factor = Array.fill(rank)(abs(rand.nextGaussian())) + val norm = sqrt(factor.map(x => x * x).sum) + factor.map(x => x / norm) } /** From f27441a1213459912acf4252e5e5736fb7cb54de Mon Sep 17 00:00:00 2001 From: Chen Chao Date: Wed, 19 Feb 2014 22:06:35 -0800 Subject: [PATCH 4/8] MLLIB-24: url of "Collaborative Filtering for Implicit Feedback Datasets" in ALS is invalid now url of "Collaborative Filtering for Implicit Feedback Datasets" is invalid now. A new url is provided. http://research.yahoo.com/files/HuKorenVolinsky-ICDM08.pdf Author: Chen Chao Closes #619 from CrazyJvm/master and squashes the following commits: a0b54e4 [Chen Chao] change url to IEEE 9e0e9f0 [Chen Chao] correct spell mistale fcfab5d [Chen Chao] wrap line to to fit within 100 chars 590d56e [Chen Chao] url error --- .../main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 5c4388d72b97..71d8a5c4aa0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -81,7 +81,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * * For implicit preference data, the algorithm used is based on * "Collaborative Filtering for Implicit Feedback Datasets", available at - * [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here. + * [[http://dx.doi.org/10.1109/ICDM.2008.22]], adapted for the blocked approach used here. * * Essentially instead of finding the low-rank approximations to the rating matrix `R`, * this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0 From 6340a182c5dec27ef67c4385d93e3ab5681bc5d4 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 19 Feb 2014 23:44:53 -0800 Subject: [PATCH 5/8] MLLIB-22. Support negative implicit input in ALS I'm back with another less trivial suggestion for ALS: In ALS for implicit feedback, input values are treated as weights on squared-errors in a loss function (or rather, the weight is a simple function of the input r, like c = 1 + alpha*r). The paper on which it's based assumes that the input is positive. Indeed, if the input is negative, it will create a negative weight on squared-errors, which causes things to go haywire. The optimization will try to make the error in a cell as large possible, and the result is silently bogus. There is a good use case for negative input values though. Implicit feedback is usually collected from signals of positive interaction like a view or like or buy, but equally, can come from "not interested" signals. The natural representation is negative values. The algorithm can be extended quite simply to provide a sound interpretation of these values: negative values should encourage the factorization to come up with 0 for cells with large negative input values, just as much as positive values encourage it to come up with 1. The implications for the algorithm are simple: * the confidence function value must not be negative, and so can become 1 + alpha*|r| * the matrix P should have a value 1 where the input R is _positive_, not merely where it is non-zero. Actually, that's what the paper already says, it's just that we can't assume P = 1 when a cell in R is specified anymore, since it may be negative This in turn entails just a few lines of code change in `ALS.scala`: * `rs(i)` becomes `abs(rs(i))` * When constructing `userXy(us(i))`, it's implicitly only adding where P is 1. That had been true for any us(i) that is iterated over, before, since these are exactly the ones for which P is 1. But now P is zero where rs(i) <= 0, and should not be added I think it's a safe change because: * It doesn't change any existing behavior (unless you're using negative values, in which case results are already borked) * It's the simplest direct extension of the paper's algorithm * (I've used it to good effect in production FWIW) Tests included. I tweaked minor things en route: * `ALS.scala` javadoc writes "R = Xt*Y" when the paper and rest of code defines it as "R = X*Yt" * RMSE in the ALS tests uses a confidence-weighted mean, but the denominator is not actually sum of weights Excuse my Scala style; I'm sure it needs tweaks. Author: Sean Owen Closes #500 from srowen/ALSNegativeImplicitInput and squashes the following commits: cf902a9 [Sean Owen] Support negative implicit input in ALS 953be1c [Sean Owen] Make weighted RMSE in ALS test actually weighted; adjust comment about R = X*Yt --- .../spark/mllib/recommendation/ALS.scala | 14 ++++++-- .../mllib/recommendation/JavaALSSuite.java | 32 +++++++++++++------ .../spark/mllib/recommendation/ALSSuite.scala | 27 ++++++++++------ 3 files changed, 52 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 71d8a5c4aa0e..e70281038b68 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -64,7 +64,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * Alternating Least Squares matrix factorization. * * ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, - * `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices. + * `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices. * The general approach is iterative. During each iteration, one of the factor matrices is held * constant, while the other is solved for using least squares. The newly-solved factor matrix is * then held constant while solving for the other factor matrix. @@ -381,8 +381,16 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l userXtX(us(i)).addi(tempXtX) SimpleBlas.axpy(rs(i), x, userXy(us(i))) case true => - userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i))) - SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i))) + // Extension to the original paper to handle rs(i) < 0. confidence is a function + // of |rs(i)| instead so that it is never negative: + val confidence = 1 + alpha * abs(rs(i)) + userXtX(us(i)).addi(tempXtX.mul(confidence - 1)) + // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) + // means we try to reconstruct 0. We add terms only where P = 1, so, term below + // is now only added for rs(i) > 0: + if (rs(i) > 0) { + SimpleBlas.axpy(confidence, x, userXy(us(i))) + } } } } diff --git a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java index b40f552e0d0a..b150334deb06 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.util.List; -import java.lang.Math; import org.junit.After; import org.junit.Assert; @@ -46,7 +45,7 @@ public void tearDown() { System.clearProperty("spark.driver.port"); } - void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, + static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features, DoubleMatrix trueRatings, double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) { DoubleMatrix predictedU = new DoubleMatrix(users, features); List> userFeatures = model.userFeatures().toJavaRDD().collect(); @@ -84,15 +83,15 @@ void validatePrediction(MatrixFactorizationModel model, int users, int products, for (int p = 0; p < products; ++p) { double prediction = predictedRatings.get(u, p); double truePref = truePrefs.get(u, p); - double confidence = 1.0 + /* alpha = */ 1.0 * trueRatings.get(u, p); + double confidence = 1.0 + /* alpha = */ 1.0 * Math.abs(trueRatings.get(u, p)); double err = confidence * (truePref - prediction) * (truePref - prediction); sqErr += err; - denom += 1.0; + denom += confidence; } } double rmse = Math.sqrt(sqErr / denom); Assert.assertTrue(String.format("Confidence-weighted RMSE=%2.4f above threshold of %2.2f", - rmse, matchThreshold), Math.abs(rmse) < matchThreshold); + rmse, matchThreshold), rmse < matchThreshold); } } @@ -103,7 +102,7 @@ public void runALSUsingStaticMethods() { int users = 50; int products = 100; scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7, false); + users, products, features, 0.7, false, false); JavaRDD data = sc.parallelize(testData._1()); MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations); @@ -117,7 +116,7 @@ public void runALSUsingConstructor() { int users = 100; int products = 200; scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7, false); + users, products, features, 0.7, false, false); JavaRDD data = sc.parallelize(testData._1()); @@ -134,7 +133,7 @@ public void runImplicitALSUsingStaticMethods() { int users = 80; int products = 160; scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7, true); + users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); @@ -148,7 +147,7 @@ public void runImplicitALSUsingConstructor() { int users = 100; int products = 200; scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( - users, products, features, 0.7, true); + users, products, features, 0.7, true, false); JavaRDD data = sc.parallelize(testData._1()); @@ -158,4 +157,19 @@ public void runImplicitALSUsingConstructor() { .run(data.rdd()); validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); } + + @Test + public void runImplicitALSWithNegativeWeight() { + int features = 2; + int iterations = 15; + int users = 80; + int products = 160; + scala.Tuple3, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList( + users, products, features, 0.7, true, true); + + JavaRDD data = sc.parallelize(testData._1()); + MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations); + validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3()); + } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 5dcec7dc3eb9..45e7d2db00c4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.mllib.recommendation import scala.collection.JavaConversions._ +import scala.math.abs import scala.util.Random -import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite import org.jblas._ @@ -34,7 +34,8 @@ object ALSSuite { products: Int, features: Int, samplingRate: Double, - implicitPrefs: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = { + implicitPrefs: Boolean, + negativeWeights: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = { val (sampledRatings, trueRatings, truePrefs) = generateRatings(users, products, features, samplingRate, implicitPrefs) (seqAsJavaList(sampledRatings), trueRatings, truePrefs) @@ -45,7 +46,8 @@ object ALSSuite { products: Int, features: Int, samplingRate: Double, - implicitPrefs: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { + implicitPrefs: Boolean = false, + negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { val rand = new Random(42) // Create a random matrix with uniform values from -1 to 1 @@ -56,7 +58,9 @@ object ALSSuite { val productMatrix = randomMatrix(features, products) val (trueRatings, truePrefs) = implicitPrefs match { case true => - val raw = new DoubleMatrix(users, products, Array.fill(users * products)(rand.nextInt(10).toDouble): _*) + // Generate raw values from [0,9], or if negativeWeights, from [-2,7] + val raw = new DoubleMatrix(users, products, + Array.fill(users * products)((if (negativeWeights) -2 else 0) + rand.nextInt(10).toDouble): _*) val prefs = new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*) (raw, prefs) case false => (userMatrix.mmul(productMatrix), null) @@ -107,6 +111,10 @@ class ALSSuite extends FunSuite with LocalSparkContext { testALS(100, 200, 2, 15, 0.7, 0.4, true, true) } + test("rank-2 matrices implicit negative") { + testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -118,13 +126,14 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param matchThreshold max difference allowed to consider a predicted rating correct * @param implicitPrefs flag to test implicit feedback * @param bulkPredict flag to test bulk prediciton + * @param negativeWeights whether the generated data can contain negative values */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false) + bulkPredict: Boolean = false, negativeWeights: Boolean = false) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, - features, samplingRate, implicitPrefs) + features, samplingRate, implicitPrefs, negativeWeights) val model = implicitPrefs match { case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) @@ -166,13 +175,13 @@ class ALSSuite extends FunSuite with LocalSparkContext { for (u <- 0 until users; p <- 0 until products) { val prediction = predictedRatings.get(u, p) val truePref = truePrefs.get(u, p) - val confidence = 1 + 1.0 * trueRatings.get(u, p) + val confidence = 1 + 1.0 * abs(trueRatings.get(u, p)) val err = confidence * (truePref - prediction) * (truePref - prediction) sqErr += err - denom += 1 + denom += confidence } val rmse = math.sqrt(sqErr / denom) - if (math.abs(rmse) > matchThreshold) { + if (rmse > matchThreshold) { fail("Model failed to predict RMSE: %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format( rmse, truePrefs, predictedRatings, predictedU, predictedP)) } From 46fe4930a980572b4b207576110d36f27283feb3 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 18 Mar 2014 15:14:13 -0700 Subject: [PATCH 6/8] [SPARK-1260]: faster construction of features with intercept The current implementation uses `Array(1.0, features: _*)` to construct a new array with intercept. This is not efficient for big arrays because `Array.apply` uses a for loop that iterates over the arguments. `Array.+:` is a better choice here. Also, I don't see a reason to set initial weights to ones. So I set them to zeros. JIRA: https://spark-project.atlassian.net/browse/SPARK-1260 Author: Xiangrui Meng Closes #161 from mengxr/sgd and squashes the following commits: b5cfc53 [Xiangrui Meng] set default weights to zeros a1439c2 [Xiangrui Meng] faster construction of features with intercept --- .../mllib/regression/GeneralizedLinearAlgorithm.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index f98b0b536dea..b9621530efa2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -119,7 +119,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] */ def run(input: RDD[LabeledPoint]) : M = { val nfeatures: Int = input.first().features.length - val initialWeights = Array.fill(nfeatures)(1.0) + val initialWeights = new Array[Double](nfeatures) run(input, initialWeights) } @@ -134,15 +134,15 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] throw new SparkException("Input validation failed.") } - // Add a extra variable consisting of all 1.0's for the intercept. + // Prepend an extra variable consisting of all 1.0's for the intercept. val data = if (addIntercept) { - input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*))) + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features.+:(1.0))) } else { input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) } val initialWeightsWithIntercept = if (addIntercept) { - Array(1.0, initialWeights:_*) + initialWeights.+:(1.0) } else { initialWeights } From a899894e5321abb4383e2c769309fe99d130a2fb Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 13 Mar 2014 00:43:19 -0700 Subject: [PATCH 7/8] [SPARK-1237, 1238] Improve the computation of YtY for implicit ALS Computing YtY can be implemented using BLAS's DSPR operations instead of generating y_i y_i^T and then combining them. The latter generates many k-by-k matrices. On the movielens data, this change improves the performance by 10-20%. The algorithm remains the same, verified by computing RMSE on the movielens data. To compare the results, I also added an option to set a random seed in ALS. JIRA: 1. https://spark-project.atlassian.net/browse/SPARK-1237 2. https://spark-project.atlassian.net/browse/SPARK-1238 Author: Xiangrui Meng Closes #131 from mengxr/als and squashes the following commits: ed00432 [Xiangrui Meng] minor changes d984623 [Xiangrui Meng] minor changes 2fc1641 [Xiangrui Meng] remove commented code 4c7cde2 [Xiangrui Meng] allow specifying a random seed in ALS 200bef0 [Xiangrui Meng] optimize computeYtY and updateBlock Conflicts: mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala --- .../spark/mllib/recommendation/ALS.scala | 177 ++++++++++++------ .../spark/mllib/recommendation/ALSSuite.scala | 15 +- 2 files changed, 135 insertions(+), 57 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index e70281038b68..44db51cda610 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -88,10 +88,15 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user * preferences rather than explicit ratings given to items. */ -class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double, - var implicitPrefs: Boolean, var alpha: Double) - extends Serializable with Logging -{ +class ALS private ( + var numBlocks: Int, + var rank: Int, + var iterations: Int, + var lambda: Double, + var implicitPrefs: Boolean, + var alpha: Double, + var seed: Long = System.nanoTime() + ) extends Serializable with Logging { def this() = this(-1, 10, 10, 0.01, false, 1.0) /** @@ -131,6 +136,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l this } + /** Sets a random seed to have deterministic results. */ + def setSeed(seed: Long): ALS = { + this.seed = seed + this + } + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. @@ -152,9 +163,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock) - // Initialize user and product factors randomly, but use a deterministic seed for each partition - // so that fault recovery works - val seedGen = new Random() + // Initialize user and product factors randomly, but use a deterministic seed for each + // partition so that fault recovery works + val seedGen = new Random(seed) val seed1 = seedGen.nextInt() val seed2 = seedGen.nextInt() // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable @@ -209,21 +220,46 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l */ def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { if (implicitPrefs) { - Option( - factors.flatMapValues { case factorArray => - factorArray.view.map { vector => - val x = new DoubleMatrix(vector) - x.mmul(x.transpose()) - } - }.reduceByKeyLocally((a, b) => a.addi(b)) - .values - .reduce((a, b) => a.addi(b)) - ) + val n = rank * (rank + 1) / 2 + val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { + Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) + L + }, combOp = (L1, L2) => { + L1.addi(L2) + }) + val YtY = new DoubleMatrix(rank, rank) + fillFullMatrix(LYtY, YtY) + Option(YtY) } else { None } } + /** + * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's DSPR. + * + * @param L the lower triangular part of the matrix packed in an array (row major) + */ + private def dspr(alpha: Double, x: DoubleMatrix, L: DoubleMatrix) = { + val n = x.length + var i = 0 + var j = 0 + var idx = 0 + var axi = 0.0 + val xd = x.data + val Ld = L.data + while (i < n) { + axi = alpha * xd(i) + j = 0 + while (j <= i) { + Ld(idx) += axi * xd(j) + j += 1 + idx += 1 + } + i += 1 + } + } + /** * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs */ @@ -373,7 +409,8 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l for (productBlock <- 0 until numBlocks) { for (p <- 0 until blockFactors(productBlock).length) { val x = new DoubleMatrix(blockFactors(productBlock)(p)) - fillXtX(x, tempXtX) + tempXtX.fill(0.0) + dspr(1.0, x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { implicitPrefs match { @@ -384,7 +421,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l // Extension to the original paper to handle rs(i) < 0. confidence is a function // of |rs(i)| instead so that it is never negative: val confidence = 1 + alpha * abs(rs(i)) - userXtX(us(i)).addi(tempXtX.mul(confidence - 1)) + SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) // means we try to reconstruct 0. We add terms only where P = 1, so, term below // is now only added for rs(i) > 0: @@ -397,35 +434,16 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l } // Solve the least-squares problem for each user and return the new feature vectors - userXtX.zipWithIndex.map{ case (triangularXtX, index) => + Array.range(0, numUsers).map { index => // Compute the full XtX matrix from the lower-triangular part we got above - fillFullMatrix(triangularXtX, fullXtX) + fillFullMatrix(userXtX(index), fullXtX) // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite implicitPrefs match { case false => Solve.solvePositive(fullXtX, userXy(index)).data - case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data - } - } - } - - /** - * Set xtxDest to the lower-triangular part of x transpose * x. For efficiency in summing - * these matrices, we store xtxDest as only rank * (rank+1) / 2 values, namely the values - * at (0,0), (1,0), (1,1), (2,0), (2,1), (2,2), etc in that order. - */ - private def fillXtX(x: DoubleMatrix, xtxDest: DoubleMatrix) { - var i = 0 - var pos = 0 - while (i < x.length) { - var j = 0 - while (j <= i) { - xtxDest.data(pos) = x.data(i) * x.data(j) - pos += 1 - j += 1 + case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data } - i += 1 } } @@ -452,9 +470,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l /** - * Top-level methods for calling Alternating Least Squares (ALS) matrix factorizaton. + * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. */ object ALS { + /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -467,15 +486,39 @@ object ALS { * @param iterations number of iterations of ALS (recommended: 10-20) * @param lambda regularization factor (recommended: 0.01) * @param blocks level of parallelism to split computation into + * @param seed random seed */ def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, - blocks: Int) - : MatrixFactorizationModel = - { + blocks: Int, + seed: Long + ): MatrixFactorizationModel = { + new ALS(blocks, rank, iterations, lambda, false, 1.0, seed).run(ratings) + } + + /** + * Train a matrix factorization model given an RDD of ratings given by users to some products, + * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the + * product of two lower-rank matrices of a given rank (number of features). To solve for these + * features, we run a given number of iterations of ALS. This is done using a level of + * parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + */ + def train( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int + ): MatrixFactorizationModel = { new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings) } @@ -492,8 +535,7 @@ object ALS { * @param lambda regularization factor (recommended: 0.01) */ def train(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double) - : MatrixFactorizationModel = - { + : MatrixFactorizationModel = { train(ratings, rank, iterations, lambda, -1) } @@ -509,8 +551,7 @@ object ALS { * @param iterations number of iterations of ALS (recommended: 10-20) */ def train(ratings: RDD[Rating], rank: Int, iterations: Int) - : MatrixFactorizationModel = - { + : MatrixFactorizationModel = { train(ratings, rank, iterations, 0.01, -1) } @@ -527,6 +568,7 @@ object ALS { * @param lambda regularization factor (recommended: 0.01) * @param blocks level of parallelism to split computation into * @param alpha confidence parameter (only applies when immplicitPrefs = true) + * @param seed random seed */ def trainImplicit( ratings: RDD[Rating], @@ -534,9 +576,34 @@ object ALS { iterations: Int, lambda: Double, blocks: Int, - alpha: Double) - : MatrixFactorizationModel = - { + alpha: Double, + seed: Long + ): MatrixFactorizationModel = { + new ALS(blocks, rank, iterations, lambda, true, alpha, seed).run(ratings) + } + + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double + ): MatrixFactorizationModel = { new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings) } @@ -553,8 +620,7 @@ object ALS { * @param lambda regularization factor (recommended: 0.01) */ def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double) - : MatrixFactorizationModel = - { + : MatrixFactorizationModel = { trainImplicit(ratings, rank, iterations, lambda, -1, alpha) } @@ -571,8 +637,7 @@ object ALS { * @param iterations number of iterations of ALS (recommended: 10-20) */ def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int) - : MatrixFactorizationModel = - { + : MatrixFactorizationModel = { trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 45e7d2db00c4..5aab9aba8f9c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -23,9 +23,10 @@ import scala.util.Random import org.scalatest.FunSuite -import org.jblas._ +import org.jblas.DoubleMatrix import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.SparkContext._ object ALSSuite { @@ -115,6 +116,18 @@ class ALSSuite extends FunSuite with LocalSparkContext { testALS(100, 200, 2, 15, 0.7, 0.4, true, false, true) } + test("pseudorandomness") { + val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2) + val model11 = ALS.train(ratings, 5, 1, 1.0, 2, 1) + val model12 = ALS.train(ratings, 5, 1, 1.0, 2, 1) + val u11 = model11.userFeatures.values.flatMap(_.toList).collect().toList + val u12 = model12.userFeatures.values.flatMap(_.toList).collect().toList + val model2 = ALS.train(ratings, 5, 1, 1.0, 2, 2) + val u2 = model2.userFeatures.values.flatMap(_.toList).collect().toList + assert(u11 == u12) + assert(u11 != u2) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * From d8928ea5b3efda38f818c3b13e38092175f50dee Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 19 Mar 2014 16:46:41 -0700 Subject: [PATCH 8/8] add Apache header to LocalSparkContext --- .../spark/mllib/util/LocalSparkContext.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala index 7d840043e5c6..212fbe9288f0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.util import org.scalatest.Suite