From a68ac10932b2d9ae50b3a27b34bf65fce7e024e1 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 19:07:27 -0400 Subject: [PATCH 01/18] A nonnegative least-squares solver. --- .../spark/mllib/optimization/NNLSbyPCG.scala | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala new file mode 100644 index 0000000000000..485c1e46ea8da --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.jblas.{DoubleMatrix, SimpleBlas} +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Object used to solve nonnegative least squares problems using a modified projected gradient method. + */ +@DeveloperApi +object NNLSbyPCG { + /** + * Solve a least squares problem, possibly with nonnegativity constraints, by a modified + * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. + */ + def solve(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = { + val n = atb.rows + val scratch = new DoubleMatrix(n, 1) + + // find the optimal unconstrained step + def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = { + val top = SimpleBlas.dot(dir, resid) + SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) + top / SimpleBlas.dot(scratch, dir) + } + + // stopping condition + def stop(step: Double, ndir: Double, nx: Double): Boolean = { + ((step != step) + || (step < 1e-6) + || (ndir < 1e-12 * nx)) + } + + val grad = new DoubleMatrix(n, 1) + val x = new DoubleMatrix(n, 1) + val dir = new DoubleMatrix(n, 1) + val lastdir = new DoubleMatrix(n, 1) + val resid = new DoubleMatrix(n, 1) + var lastnorm = 0.0 + var iterno = 0 + var lastwall = 0 + var i = 0 + while (iterno < 40000) { + // find the residual + SimpleBlas.gemv(1.0, ata, x, 0.0, resid) + SimpleBlas.axpy(-1.0, atb, resid) + SimpleBlas.copy(resid, grad) + + // project the gradient + if (nonnegative) { + i = 0 + while (i < n) { + if (grad.data(i) > 0.0 && x.data(i) == 0.0) + grad.data(i) = 0.0 + i = i + 1 + } + } + val ngrad = SimpleBlas.dot(grad, grad) + + SimpleBlas.copy(grad, dir) + + // use a CG direction under certain conditions + var step = steplen(grad, resid) + var ndir = 0.0 + val nx = SimpleBlas.dot(x, x) + if (iterno > lastwall + 1) { + val alpha = ngrad / lastnorm + SimpleBlas.axpy(alpha, lastdir, dir) + val dstep = steplen(dir, resid) + ndir = SimpleBlas.dot(dir, dir) + if (stop(dstep, ndir, nx)) { + // reject the CG step if it could lead to premature termination + SimpleBlas.copy(grad, dir) + ndir = SimpleBlas.dot(dir, dir) + } else { + step = dstep + } + } else { + ndir = SimpleBlas.dot(dir, dir) + } + + // terminate? + if (stop(step, ndir, nx)) { + return x.data + } + + // don't run through the walls + if (nonnegative) { + i = 0 + while (i < n) { + if (step * dir.data(i) > x.data(i)) + step = Math.min(step, x.data(i) / dir.data(i)) + i = i + 1 + } + } + + // take the step + i = 0 + while (i < n) { + if (nonnegative) { + if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { + x.data(i) = 0 + lastwall = iterno + } else x.data(i) -= step * dir.data(i) + } else { + x.data(i) -= step * dir.data(i) + } + i = i + 1 + } + + iterno = iterno + 1 + SimpleBlas.copy(dir, lastdir) + lastnorm = ngrad + } + x.data + } + +} From 6cb563c19e7ae554724f9832f83a4df96d3d92c1 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 19:07:40 -0400 Subject: [PATCH 02/18] Tests for the nonnegative least squares solver. --- .../spark/mllib/optimization/NNLSSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala new file mode 100644 index 0000000000000..f6f054741721a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.mllib.util.LocalSparkContext + +import org.jblas.DoubleMatrix +import org.jblas.SimpleBlas + +class NNLSSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + test("NNLSbyPCG: exact solution case") { + val A = new DoubleMatrix(20, 20) + val b = new DoubleMatrix(20, 1) + val rand = new Random(12345) + for (i <- 0 until 20; j <- 0 until 20) { + val aij = rand.nextDouble() + A.put(i, j, aij) + b.put(i, b.get(i, 0) + aij) + } + + val ata = new DoubleMatrix(20, 20) + val atb = new DoubleMatrix(20, 1) + for (i <- 0 until 20; j <- 0 until 20; k <- 0 until 20) { + ata.put(i, j, ata.get(i, j) + A.get(k, i) * A.get(k, j)) + } + for (i <- 0 until 20; j <- 0 until 20) { + atb.put(i, atb.get(i, 0) + A.get(j, i) * b.get(j)) + } + + val x = NNLSbyPCG.solve(ata, atb, true) + assert(x.length == 20) + var error = 0.0 + for (i <- 0 until 20) { + error = error + (x(i) - 1) * (x(i) - 1) + assert(Math.abs(x(i) - 1) < 1e-3) + } + assert(error < 1e-2) + } +} From f5dbf4d5fb5fe5d09cf86e0fba388efc68acadd0 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 19:08:31 -0400 Subject: [PATCH 03/18] Teach ALS how to use the NNLS solver. --- .../spark/mllib/recommendation/ALS.scala | 195 +++++++++++++++++- 1 file changed, 193 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 1f5c746a3457c..d6fa537d6b08b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -32,6 +32,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils +import org.apache.spark.mllib.optimization.NNLSbyPCG /** * Out-link information for a user or product block. This includes the original user/product IDs @@ -156,6 +158,18 @@ class ALS private ( this } + /** If true, do alternating nonnegative least squares. */ + private var nonnegative = false + + /** + * Set whether the least-squares problems solved at each iteration should have + * nonnegativity constraints. + */ + def setNonnegative(b: Boolean): ALS = { + this.nonnegative = b + this + } + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. @@ -498,10 +512,128 @@ class ALS private ( (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite if (implicitPrefs) { - Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data + solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index)) + } else { + solveLeastSquares(fullXtX, userXy(index)) + } + } + } + + /** + * Solve a least squares problem, possibly with nonnegativity constraints, by a modified + * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. + */ + def solveLSbyPCG(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = { + val n = atb.rows + val scratch = new DoubleMatrix(n, 1) + + // find the optimal unconstrained step + def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = { + val top = SimpleBlas.dot(dir, resid) + SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) + top / SimpleBlas.dot(scratch, dir) + } + + // stopping condition + def stop(step: Double, ndir: Double, nx: Double): Boolean = { + ((step != step) + || (step < 1e-6) + || (ndir < 1e-12 * nx)) + } + + val grad = new DoubleMatrix(n, 1) + val x = new DoubleMatrix(n, 1) + val dir = new DoubleMatrix(n, 1) + val lastdir = new DoubleMatrix(n, 1) + val resid = new DoubleMatrix(n, 1) + var lastnorm = 0.0 + var iterno = 0 + var lastwall = 0 + var i = 0 + while (iterno < 40000) { + // find the residual + SimpleBlas.gemv(1.0, ata, x, 0.0, resid) + SimpleBlas.axpy(-1.0, atb, resid) + SimpleBlas.copy(resid, grad) + + // project the gradient + if (nonnegative) { + i = 0 + while (i < n) { + if (grad.data(i) > 0.0 && x.data(i) == 0.0) + grad.data(i) = 0.0 + i = i + 1 + } + } + val ngrad = SimpleBlas.dot(grad, grad) + + SimpleBlas.copy(grad, dir) + + // use a CG direction under certain conditions + var step = steplen(grad, resid) + var ndir = 0.0 + val nx = SimpleBlas.dot(x, x) + if (iterno > lastwall + 1) { + val alpha = ngrad / lastnorm + SimpleBlas.axpy(alpha, lastdir, dir) + val dstep = steplen(dir, resid) + ndir = SimpleBlas.dot(dir, dir) + if (stop(dstep, ndir, nx)) { + // reject the CG step if it could lead to premature termination + SimpleBlas.copy(grad, dir) + ndir = SimpleBlas.dot(dir, dir) + } else { + step = dstep + } } else { - Solve.solvePositive(fullXtX, userXy(index)).data + ndir = SimpleBlas.dot(dir, dir) + } + + // terminate? + if (stop(step, ndir, nx)) { + return x.data + } + + // don't run through the walls + if (nonnegative) { + i = 0 + while (i < n) { + if (step * dir.data(i) > x.data(i)) + step = Math.min(step, x.data(i) / dir.data(i)) + i = i + 1 + } + } + + // take the step + i = 0 + while (i < n) { + if (nonnegative) { + if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { + x.data(i) = 0 + lastwall = iterno + } else x.data(i) -= step * dir.data(i) + } else { + x.data(i) -= step * dir.data(i) + } + i = i + 1 } + + iterno = iterno + 1 + SimpleBlas.copy(dir, lastdir) + lastnorm = ngrad + } + x.data + } + + /** + * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject + * to nonnegativity constraints if `nonnegative` is true. + */ + def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix): Array[Double] = { + if (!nonnegative) { + Solve.solvePositive(ata, atb).data + } else { + NNLSbyPCG.solve(ata, atb, true) } } @@ -532,6 +664,34 @@ class ALS private ( */ object ALS { + /** + * Train a matrix factorization model given an RDD of ratings given by users to some products, + * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the + * product of two lower-rank matrices of a given rank (number of features). To solve for these + * features, we run a given number of iterations of ALS. This is done using a level of + * parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param seed random seed + * @param nonnegative Whether to impose nonnegativity constraints + */ + def train( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + seed: Long, + nonnegative: Boolean) = { + val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed) + if (nonnegative) als.setNonnegative(true) + als.run(ratings) + } + /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -613,6 +773,37 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } + /** + * Train a matrix factorization model given an RDD of 'implicit preferences' given by users + * to some products, in the form of (userID, productID, preference) pairs. We approximate the + * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). + * To solve for these features, we run a given number of iterations of ALS. This is done using + * a level of parallelism given by `blocks`. + * + * @param ratings RDD of (userID, productID, rating) pairs + * @param rank number of features to use + * @param iterations number of iterations of ALS (recommended: 10-20) + * @param lambda regularization factor (recommended: 0.01) + * @param blocks level of parallelism to split computation into + * @param alpha confidence parameter (only applies when immplicitPrefs = true) + * @param seed random seed + * @param nonnegative Whether to impose nonnegativity upon the user and product factors + */ + def trainImplicit( + ratings: RDD[Rating], + rank: Int, + iterations: Int, + lambda: Double, + blocks: Int, + alpha: Double, + seed: Long, + nonnegative: Boolean + ): MatrixFactorizationModel = { + new ALS(blocks, rank, iterations, lambda, true, alpha, seed) + .setNonnegative(nonnegative) + .run(ratings) + } + /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the From 89ea0a8ef942ba0e62d91d75ca26f92a85b25470 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 19:08:56 -0400 Subject: [PATCH 04/18] Hack ALSSuite to support NNLS testing. --- .../spark/mllib/recommendation/ALSSuite.scala | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 5aab9aba8f9c0..9022e5db83b7f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -48,12 +48,18 @@ object ALSSuite { features: Int, samplingRate: Double, implicitPrefs: Boolean = false, - negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { + negativeWeights: Boolean = false, + negativeFactors: Boolean = true): (Seq[Rating], DoubleMatrix, DoubleMatrix) = { val rand = new Random(42) // Create a random matrix with uniform values from -1 to 1 - def randomMatrix(m: Int, n: Int) = - new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + def randomMatrix(m: Int, n: Int) = { + if (negativeFactors) { + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*) + } else { + new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble()): _*) + } + } val userMatrix = randomMatrix(users, features) val productMatrix = randomMatrix(features, products) @@ -128,6 +134,27 @@ class ALSSuite extends FunSuite with LocalSparkContext { assert(u11 != u2) } + test("negative ids") { + val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) + val ratings = sc.parallelize(data._1.map { case Rating(u,p,r) => Rating(u-25,p-25,r) }) + val correct = data._2 + val model = ALS.train(ratings, 5, 15) + + val pairs = Array.tabulate(50, 50)((u,p) => (u-25,p-25)).flatten + val ans = model.predict(sc.parallelize(pairs)).collect + ans.foreach { r => + val u = r.user + 25 + val p = r.product + 25 + val v = r.rating + val error = v - correct.get(u,p) + assert(math.abs(error) < 0.4) + } + } + + test("NNALS, rank 2") { + testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false) + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -140,16 +167,21 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param implicitPrefs flag to test implicit feedback * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values + * @param numBlocks number of blocks to partition users and products into + * @param negativeFactors whether the generated user/product factors can have negative entries */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false) + bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1, + negativeFactors: Boolean = true) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, - features, samplingRate, implicitPrefs, negativeWeights) + features, samplingRate, implicitPrefs, negativeWeights, negativeFactors) val model = implicitPrefs match { - case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) - case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) + case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 0L, !negativeFactors) + case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 1.0, 0L, !negativeFactors) } val predictedU = new DoubleMatrix(users, features) From 33bf4f20e28fdeaab2ed6ae36ef97115898c88dd Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 16 Apr 2014 14:14:38 -0400 Subject: [PATCH 05/18] Fix missing space. --- .../scala/org/apache/spark/mllib/recommendation/ALSSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 9022e5db83b7f..4c5e43fbfb161 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -146,7 +146,7 @@ class ALSSuite extends FunSuite with LocalSparkContext { val u = r.user + 25 val p = r.product + 25 val v = r.rating - val error = v - correct.get(u,p) + val error = v - correct.get(u, p) assert(math.abs(error) < 0.4) } } From 9a82fa61cb467db2e0dbc308d15268f15d20500d Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 23:54:58 -0400 Subject: [PATCH 06/18] Fix scalastyle moanings. --- .../org/apache/spark/mllib/optimization/NNLSbyPCG.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index 485c1e46ea8da..9572ef7f8f2a2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -22,7 +22,8 @@ import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Object used to solve nonnegative least squares problems using a modified projected gradient method. + * Object used to solve nonnegative least squares problems using a modified + * projected gradient method. */ @DeveloperApi object NNLSbyPCG { @@ -67,8 +68,9 @@ object NNLSbyPCG { if (nonnegative) { i = 0 while (i < n) { - if (grad.data(i) > 0.0 && x.data(i) == 0.0) + if (grad.data(i) > 0.0 && x.data(i) == 0.0) { grad.data(i) = 0.0 + } i = i + 1 } } @@ -105,8 +107,9 @@ object NNLSbyPCG { if (nonnegative) { i = 0 while (i < n) { - if (step * dir.data(i) > x.data(i)) + if (step * dir.data(i) > x.data(i)) { step = Math.min(step, x.data(i) / dir.data(i)) + } i = i + 1 } } From c288b6ae99bb4aecacead1d399ec61dadd1ce2ab Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sat, 19 Apr 2014 23:55:13 -0400 Subject: [PATCH 07/18] Finish moving the NNLS solver. --- .../spark/mllib/recommendation/ALS.scala | 106 ------------------ 1 file changed, 106 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index d6fa537d6b08b..dfb16d138c966 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -519,112 +519,6 @@ class ALS private ( } } - /** - * Solve a least squares problem, possibly with nonnegativity constraints, by a modified - * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. - */ - def solveLSbyPCG(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = { - val n = atb.rows - val scratch = new DoubleMatrix(n, 1) - - // find the optimal unconstrained step - def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = { - val top = SimpleBlas.dot(dir, resid) - SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) - top / SimpleBlas.dot(scratch, dir) - } - - // stopping condition - def stop(step: Double, ndir: Double, nx: Double): Boolean = { - ((step != step) - || (step < 1e-6) - || (ndir < 1e-12 * nx)) - } - - val grad = new DoubleMatrix(n, 1) - val x = new DoubleMatrix(n, 1) - val dir = new DoubleMatrix(n, 1) - val lastdir = new DoubleMatrix(n, 1) - val resid = new DoubleMatrix(n, 1) - var lastnorm = 0.0 - var iterno = 0 - var lastwall = 0 - var i = 0 - while (iterno < 40000) { - // find the residual - SimpleBlas.gemv(1.0, ata, x, 0.0, resid) - SimpleBlas.axpy(-1.0, atb, resid) - SimpleBlas.copy(resid, grad) - - // project the gradient - if (nonnegative) { - i = 0 - while (i < n) { - if (grad.data(i) > 0.0 && x.data(i) == 0.0) - grad.data(i) = 0.0 - i = i + 1 - } - } - val ngrad = SimpleBlas.dot(grad, grad) - - SimpleBlas.copy(grad, dir) - - // use a CG direction under certain conditions - var step = steplen(grad, resid) - var ndir = 0.0 - val nx = SimpleBlas.dot(x, x) - if (iterno > lastwall + 1) { - val alpha = ngrad / lastnorm - SimpleBlas.axpy(alpha, lastdir, dir) - val dstep = steplen(dir, resid) - ndir = SimpleBlas.dot(dir, dir) - if (stop(dstep, ndir, nx)) { - // reject the CG step if it could lead to premature termination - SimpleBlas.copy(grad, dir) - ndir = SimpleBlas.dot(dir, dir) - } else { - step = dstep - } - } else { - ndir = SimpleBlas.dot(dir, dir) - } - - // terminate? - if (stop(step, ndir, nx)) { - return x.data - } - - // don't run through the walls - if (nonnegative) { - i = 0 - while (i < n) { - if (step * dir.data(i) > x.data(i)) - step = Math.min(step, x.data(i) / dir.data(i)) - i = i + 1 - } - } - - // take the step - i = 0 - while (i < n) { - if (nonnegative) { - if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { - x.data(i) = 0 - lastwall = iterno - } else x.data(i) -= step * dir.data(i) - } else { - x.data(i) -= step * dir.data(i) - } - i = i + 1 - } - - iterno = iterno + 1 - SimpleBlas.copy(dir, lastdir) - lastnorm = ngrad - } - x.data - } - /** * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject * to nonnegativity constraints if `nonnegative` is true. From ac673bda27883d4bb4085fa326380aaed29b1a70 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sun, 20 Apr 2014 15:29:10 -0400 Subject: [PATCH 08/18] More safeguards against numerical ridiculousness. --- .../apache/spark/mllib/optimization/NNLSbyPCG.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index 9572ef7f8f2a2..779f374158c3b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -39,14 +39,18 @@ object NNLSbyPCG { def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = { val top = SimpleBlas.dot(dir, resid) SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) - top / SimpleBlas.dot(scratch, dir) + // Push the denominator upward very slightly to avoid infinities and silliness + top / (SimpleBlas.dot(scratch, dir) + 1e-20) } // stopping condition def stop(step: Double, ndir: Double, nx: Double): Boolean = { - ((step != step) - || (step < 1e-6) - || (ndir < 1e-12 * nx)) + ((step != step) // NaN + || (step < 1e-6) // too small or negative + || (step > 1e40) // too small; almost certainly numerical problems + || (ndir < 1e-12 * nx) // gradient relatively too small + || (ndir < 1e-32) // gradient absolutely too small; numerical issues may lurk + ) } val grad = new DoubleMatrix(n, 1) @@ -134,5 +138,4 @@ object NNLSbyPCG { } x.data } - } From 5345402b6d44cd9fad46de830b0f7b4c8d3f9268 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Sun, 20 Apr 2014 22:04:35 -0400 Subject: [PATCH 09/18] Style fixes that got eaten. --- .../org/apache/spark/mllib/recommendation/ALSSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 4c5e43fbfb161..21466b8ba23ba 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -136,12 +136,14 @@ class ALSSuite extends FunSuite with LocalSparkContext { test("negative ids") { val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) - val ratings = sc.parallelize(data._1.map { case Rating(u,p,r) => Rating(u-25,p-25,r) }) + val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => + Rating(u - 25, p - 25, r) + }) val correct = data._2 val model = ALS.train(ratings, 5, 15) - val pairs = Array.tabulate(50, 50)((u,p) => (u-25,p-25)).flatten - val ans = model.predict(sc.parallelize(pairs)).collect + val pairs = Array.tabulate(50, 50)((u, p) => (u - 25, p - 25)).flatten + val ans = model.predict(sc.parallelize(pairs)).collect() ans.foreach { r => val u = r.user + 25 val p = r.product + 25 From 8a1a436207426bea8fa2227f3e1390d5d1a839e3 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 12:30:56 -0400 Subject: [PATCH 10/18] Describe the problem and add a reference to Polyak's paper. --- .../apache/spark/mllib/optimization/NNLSbyPCG.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index 779f374158c3b..3c91bd02f534f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -30,6 +30,16 @@ object NNLSbyPCG { /** * Solve a least squares problem, possibly with nonnegativity constraints, by a modified * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. + * + * We solve the problem + * min_x 1/2 x^T ata x^T - x^T atb + * subject to x >= 0 (if nonnegative == true) + * + * The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient + * method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound- + * constrained nonlinear programming. Polyak unconditionally uses a conjugate gradient + * direction, however, while this method only uses a conjugate gradient direction if the last + * iteration did not cause a previously-inactive constraint to become active. */ def solve(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = { val n = atb.rows From 9c820b6133c304a7adefa256e3a09eb3d226ec82 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 12:33:46 -0400 Subject: [PATCH 11/18] Tweak variable names. --- .../spark/mllib/optimization/NNLSbyPCG.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index 3c91bd02f534f..99a3ffb96f8fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.optimization import org.jblas.{DoubleMatrix, SimpleBlas} + import org.apache.spark.annotation.DeveloperApi /** @@ -26,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi * projected gradient method. */ @DeveloperApi -object NNLSbyPCG { +private[mllib] object NNLSbyPCG { /** * Solve a least squares problem, possibly with nonnegativity constraints, by a modified * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. @@ -46,8 +47,8 @@ object NNLSbyPCG { val scratch = new DoubleMatrix(n, 1) // find the optimal unconstrained step - def steplen(dir: DoubleMatrix, resid: DoubleMatrix): Double = { - val top = SimpleBlas.dot(dir, resid) + def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = { + val top = SimpleBlas.dot(dir, res) SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch) // Push the denominator upward very slightly to avoid infinities and silliness top / (SimpleBlas.dot(scratch, dir) + 1e-20) @@ -66,17 +67,17 @@ object NNLSbyPCG { val grad = new DoubleMatrix(n, 1) val x = new DoubleMatrix(n, 1) val dir = new DoubleMatrix(n, 1) - val lastdir = new DoubleMatrix(n, 1) - val resid = new DoubleMatrix(n, 1) - var lastnorm = 0.0 + val lastDir = new DoubleMatrix(n, 1) + val res = new DoubleMatrix(n, 1) + var lastNorm = 0.0 var iterno = 0 - var lastwall = 0 + var lastWall = 0 // Last iteration when we hit a bound constraint. var i = 0 while (iterno < 40000) { // find the residual - SimpleBlas.gemv(1.0, ata, x, 0.0, resid) - SimpleBlas.axpy(-1.0, atb, resid) - SimpleBlas.copy(resid, grad) + SimpleBlas.gemv(1.0, ata, x, 0.0, res) + SimpleBlas.axpy(-1.0, atb, res) + SimpleBlas.copy(res, grad) // project the gradient if (nonnegative) { @@ -93,13 +94,13 @@ object NNLSbyPCG { SimpleBlas.copy(grad, dir) // use a CG direction under certain conditions - var step = steplen(grad, resid) + var step = steplen(grad, res) var ndir = 0.0 val nx = SimpleBlas.dot(x, x) - if (iterno > lastwall + 1) { - val alpha = ngrad / lastnorm - SimpleBlas.axpy(alpha, lastdir, dir) - val dstep = steplen(dir, resid) + if (iterno > lastWall + 1) { + val alpha = ngrad / lastNorm + SimpleBlas.axpy(alpha, lastDir, dir) + val dstep = steplen(dir, res) ndir = SimpleBlas.dot(dir, dir) if (stop(dstep, ndir, nx)) { // reject the CG step if it could lead to premature termination @@ -134,7 +135,7 @@ object NNLSbyPCG { if (nonnegative) { if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { x.data(i) = 0 - lastwall = iterno + lastWall = iterno } else x.data(i) -= step * dir.data(i) } else { x.data(i) -= step * dir.data(i) @@ -143,8 +144,8 @@ object NNLSbyPCG { } iterno = iterno + 1 - SimpleBlas.copy(dir, lastdir) - lastnorm = ngrad + SimpleBlas.copy(dir, lastDir) + lastNorm = ngrad } x.data } From b2851066c88f5436e18877ee2eb337b522c08512 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 13:47:09 -0400 Subject: [PATCH 12/18] Clean up NNLS test cases. --- .../spark/mllib/optimization/NNLSSuite.scala | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala index f6f054741721a..7f6e828a10b51 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -20,40 +20,55 @@ package org.apache.spark.mllib.optimization import scala.util.Random import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.mllib.util.LocalSparkContext +import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas} -import org.jblas.DoubleMatrix -import org.jblas.SimpleBlas - -class NNLSSuite extends FunSuite with LocalSparkContext with ShouldMatchers { +class NNLSSuite extends FunSuite { test("NNLSbyPCG: exact solution case") { - val A = new DoubleMatrix(20, 20) - val b = new DoubleMatrix(20, 1) + val n = 20 + val A = new DoubleMatrix(n, n) + val b = new DoubleMatrix(n, 1) val rand = new Random(12345) - for (i <- 0 until 20; j <- 0 until 20) { + for (i <- 0 until n; j <- 0 until n) { val aij = rand.nextDouble() A.put(i, j, aij) b.put(i, b.get(i, 0) + aij) } - val ata = new DoubleMatrix(20, 20) - val atb = new DoubleMatrix(20, 1) - for (i <- 0 until 20; j <- 0 until 20; k <- 0 until 20) { - ata.put(i, j, ata.get(i, j) + A.get(k, i) * A.get(k, j)) - } - for (i <- 0 until 20; j <- 0 until 20) { - atb.put(i, atb.get(i, 0) + A.get(j, i) * b.get(j)) - } + val ata = new DoubleMatrix(n, n) + val atb = new DoubleMatrix(n, 1) + + NativeBlas.dgemm('T', 'N', n, n, n, 1.0, A.data, 0, n, A.data, 0, n, 0.0, ata.data, 0, n) + NativeBlas.dgemv('T', n, n, 1.0, A.data, 0, n, b.data, 0, 1, 0.0, atb.data, 0, 1) val x = NNLSbyPCG.solve(ata, atb, true) - assert(x.length == 20) + assert(x.length == n) var error = 0.0 - for (i <- 0 until 20) { + for (i <- 0 until n) { error = error + (x(i) - 1) * (x(i) - 1) assert(Math.abs(x(i) - 1) < 1e-3) } assert(error < 1e-2) } + + test("NNLSbyPCG: nonnegativity constraint active") { + val n = 5 + val M = Array( + Array( 4.377, -3.531, -1.306, -0.139, 3.418, -1.632), + Array(-3.531, 4.344, 0.934, 0.305, -2.140, 2.115), + Array(-1.306, 0.934, 2.644, -0.203, -0.170, 1.094), + Array(-0.139, 0.305, -0.203, 5.883, 1.428, -1.025), + Array( 3.418, -2.140, -0.170, 1.428, 4.684, -0.636)) + val ata = new DoubleMatrix(5, 5) + val atb = new DoubleMatrix(5, 1) + for (i <- 0 until 5; j <- 0 until 5) ata.put(i, j, M(i)(j)) + for (i <- 0 until 5) atb.put(i, M(i)(5)) + + val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628) + + val x = NNLSbyPCG.solve(ata, atb, true) + for (i <- 0 until 5) { + assert(Math.abs(x(i) - goodx(i)) < 1e-3) + } + } } From e2a01d179df48dab74d1cd42f07755a66146b74c Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 14:48:55 -0400 Subject: [PATCH 13/18] Create a workspace object for NNLS to cut down on memory allocations. --- .../spark/mllib/optimization/NNLSbyPCG.scala | 48 +++++++++++++---- .../spark/mllib/recommendation/ALS.scala | 11 ++-- .../spark/mllib/optimization/NNLSSuite.scala | 53 +++++++++++++------ 3 files changed, 83 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index 99a3ffb96f8fd..f889ac2842d1b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -21,6 +21,7 @@ import org.jblas.{DoubleMatrix, SimpleBlas} import org.apache.spark.annotation.DeveloperApi + /** * :: DeveloperApi :: * Object used to solve nonnegative least squares problems using a modified @@ -28,6 +29,32 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi private[mllib] object NNLSbyPCG { + class Workspace(val n: Int) { + val scratch = new DoubleMatrix(n, 1) + val grad = new DoubleMatrix(n, 1) + val x = new DoubleMatrix(n, 1) + val dir = new DoubleMatrix(n, 1) + val lastDir = new DoubleMatrix(n, 1) + val res = new DoubleMatrix(n, 1) + + def wipe() { + var i: Int = 0 + while (i < n) { + scratch.data(i) = 0.0 + grad.data(i) = 0.0 + x.data(i) = 0.0 + dir.data(i) = 0.0 + lastDir.data(i) = 0.0 + res.data(i) = 0.0 + i = i + 1 + } + } + } + + def createWorkspace(n: Int): Workspace = { + new Workspace(n) + } + /** * Solve a least squares problem, possibly with nonnegativity constraints, by a modified * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b. @@ -42,9 +69,12 @@ private[mllib] object NNLSbyPCG { * direction, however, while this method only uses a conjugate gradient direction if the last * iteration did not cause a previously-inactive constraint to become active. */ - def solve(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean): Array[Double] = { + def solve(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean, + ws: Workspace): Array[Double] = { + ws.wipe() + val n = atb.rows - val scratch = new DoubleMatrix(n, 1) + val scratch = ws.scratch // find the optimal unconstrained step def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = { @@ -64,11 +94,11 @@ private[mllib] object NNLSbyPCG { ) } - val grad = new DoubleMatrix(n, 1) - val x = new DoubleMatrix(n, 1) - val dir = new DoubleMatrix(n, 1) - val lastDir = new DoubleMatrix(n, 1) - val res = new DoubleMatrix(n, 1) + val grad = ws.grad + val x = ws.x + val dir = ws.dir + val lastDir = ws.lastDir + val res = ws.res var lastNorm = 0.0 var iterno = 0 var lastWall = 0 // Last iteration when we hit a bound constraint. @@ -115,7 +145,7 @@ private[mllib] object NNLSbyPCG { // terminate? if (stop(step, ndir, nx)) { - return x.data + return x.data.clone } // don't run through the walls @@ -147,6 +177,6 @@ private[mllib] object NNLSbyPCG { SimpleBlas.copy(dir, lastDir) lastNorm = ngrad } - x.data + x.data.clone } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index dfb16d138c966..29a7c003c4172 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -504,6 +504,8 @@ class ALS private ( } } + val ws = NNLSbyPCG.createWorkspace(rank) + // Solve the least-squares problem for each user and return the new feature vectors Array.range(0, numUsers).map { index => // Compute the full XtX matrix from the lower-triangular part we got above @@ -512,9 +514,9 @@ class ALS private ( (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite if (implicitPrefs) { - solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index)) + solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws) } else { - solveLeastSquares(fullXtX, userXy(index)) + solveLeastSquares(fullXtX, userXy(index), ws) } } } @@ -523,11 +525,12 @@ class ALS private ( * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject * to nonnegativity constraints if `nonnegative` is true. */ - def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix): Array[Double] = { + def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix, + ws: NNLSbyPCG.Workspace): Array[Double] = { if (!nonnegative) { Solve.solvePositive(ata, atb).data } else { - NNLSbyPCG.solve(ata, atb, true) + NNLSbyPCG.solve(ata, atb, true, ws) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala index 7f6e828a10b51..6216d51bd006b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -24,11 +24,10 @@ import org.scalatest.FunSuite import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas} class NNLSSuite extends FunSuite { - test("NNLSbyPCG: exact solution case") { - val n = 20 + /** Generate a NNLS problem whose optimal solution is the all-ones vector. */ + def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = { val A = new DoubleMatrix(n, n) val b = new DoubleMatrix(n, 1) - val rand = new Random(12345) for (i <- 0 until n; j <- 0 until n) { val aij = rand.nextDouble() A.put(i, j, aij) @@ -41,14 +40,35 @@ class NNLSSuite extends FunSuite { NativeBlas.dgemm('T', 'N', n, n, n, 1.0, A.data, 0, n, A.data, 0, n, 0.0, ata.data, 0, n) NativeBlas.dgemv('T', n, n, 1.0, A.data, 0, n, b.data, 0, 1, 0.0, atb.data, 0, 1) - val x = NNLSbyPCG.solve(ata, atb, true) - assert(x.length == n) - var error = 0.0 - for (i <- 0 until n) { - error = error + (x(i) - 1) * (x(i) - 1) - assert(Math.abs(x(i) - 1) < 1e-3) + (ata, atb) + } + + test("NNLSbyPCG: exact solution cases") { + val n = 20 + val rand = new Random(12346) + val ws = NNLSbyPCG.createWorkspace(n) + var numSolved = 0 + + // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLSbyPCG + // can legitimately fail to solve these anywhere close to exactly. So we grab a considerable + // sample of these matrices and make sure that we solved a substantial fraction of them. + + for (kase <- 0 until 100) { + val (ata, atb) = genOnesData(n, rand) + val x = NNLSbyPCG.solve(ata, atb, true, ws) + assert(x.length == n) + var error = 0.0 + var solved = true + for (i <- 0 until n) { + error = error + (x(i) - 1) * (x(i) - 1) + if (Math.abs(x(i) - 1) > 1e-3) solved = false + } + if (error > 1e-2) solved = false + if (solved) numSolved = numSolved + 1 } - assert(error < 1e-2) + println(numSolved) + + assert(numSolved > 50) } test("NNLSbyPCG: nonnegativity constraint active") { @@ -59,15 +79,16 @@ class NNLSSuite extends FunSuite { Array(-1.306, 0.934, 2.644, -0.203, -0.170, 1.094), Array(-0.139, 0.305, -0.203, 5.883, 1.428, -1.025), Array( 3.418, -2.140, -0.170, 1.428, 4.684, -0.636)) - val ata = new DoubleMatrix(5, 5) - val atb = new DoubleMatrix(5, 1) - for (i <- 0 until 5; j <- 0 until 5) ata.put(i, j, M(i)(j)) - for (i <- 0 until 5) atb.put(i, M(i)(5)) + val ata = new DoubleMatrix(n, n) + val atb = new DoubleMatrix(n, 1) + for (i <- 0 until n; j <- 0 until n) ata.put(i, j, M(i)(j)) + for (i <- 0 until n) atb.put(i, M(i)(n)) val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628) - val x = NNLSbyPCG.solve(ata, atb, true) - for (i <- 0 until 5) { + val ws = NNLSbyPCG.createWorkspace(n) + val x = NNLSbyPCG.solve(ata, atb, true, ws) + for (i <- 0 until n) { assert(Math.abs(x(i) - goodx(i)) < 1e-3) } } From 0cb44812d75099a352615a35ff95504e654a278b Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 21 Apr 2014 17:56:11 -0400 Subject: [PATCH 14/18] Drop the iteration limit from 40k to max(400,20n). --- .../scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala index f889ac2842d1b..ef732d32c2394 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala @@ -99,11 +99,12 @@ private[mllib] object NNLSbyPCG { val dir = ws.dir val lastDir = ws.lastDir val res = ws.res + val iterMax = Math.max(400, 20 * n) var lastNorm = 0.0 var iterno = 0 var lastWall = 0 // Last iteration when we hit a bound constraint. var i = 0 - while (iterno < 40000) { + while (iterno < iterMax) { // find the residual SimpleBlas.gemv(1.0, ata, x, 0.0, res) SimpleBlas.axpy(-1.0, atb, res) From 2d4f3cb54aac1b12392cc75277bb1d232418b9cf Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Fri, 25 Apr 2014 12:32:43 -0400 Subject: [PATCH 15/18] Cleanup. --- .../{NNLSbyPCG.scala => NNLS.scala} | 60 +++++++------------ .../spark/mllib/recommendation/ALS.scala | 14 ++--- .../spark/mllib/optimization/NNLSSuite.scala | 18 +++--- 3 files changed, 39 insertions(+), 53 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/optimization/{NNLSbyPCG.scala => NNLS.scala} (81%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala similarity index 81% rename from mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala rename to mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala index ef732d32c2394..e4b436b023794 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLSbyPCG.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala @@ -21,14 +21,11 @@ import org.jblas.{DoubleMatrix, SimpleBlas} import org.apache.spark.annotation.DeveloperApi - /** - * :: DeveloperApi :: * Object used to solve nonnegative least squares problems using a modified * projected gradient method. */ -@DeveloperApi -private[mllib] object NNLSbyPCG { +private[mllib] object NNLS { class Workspace(val n: Int) { val scratch = new DoubleMatrix(n, 1) val grad = new DoubleMatrix(n, 1) @@ -38,16 +35,12 @@ private[mllib] object NNLSbyPCG { val res = new DoubleMatrix(n, 1) def wipe() { - var i: Int = 0 - while (i < n) { - scratch.data(i) = 0.0 - grad.data(i) = 0.0 - x.data(i) = 0.0 - dir.data(i) = 0.0 - lastDir.data(i) = 0.0 - res.data(i) = 0.0 - i = i + 1 - } + scratch.fill(0.0) + grad.fill(0.0) + x.fill(0.0) + dir.fill(0.0) + lastDir.fill(0.0) + res.fill(0.0) } } @@ -61,7 +54,7 @@ private[mllib] object NNLSbyPCG { * * We solve the problem * min_x 1/2 x^T ata x^T - x^T atb - * subject to x >= 0 (if nonnegative == true) + * subject to x >= 0 * * The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient * method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound- @@ -69,8 +62,7 @@ private[mllib] object NNLSbyPCG { * direction, however, while this method only uses a conjugate gradient direction if the last * iteration did not cause a previously-inactive constraint to become active. */ - def solve(ata: DoubleMatrix, atb: DoubleMatrix, nonnegative: Boolean, - ws: Workspace): Array[Double] = { + def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = { ws.wipe() val n = atb.rows @@ -86,7 +78,7 @@ private[mllib] object NNLSbyPCG { // stopping condition def stop(step: Double, ndir: Double, nx: Double): Boolean = { - ((step != step) // NaN + ((step.isNaN) // NaN || (step < 1e-6) // too small or negative || (step > 1e40) // too small; almost certainly numerical problems || (ndir < 1e-12 * nx) // gradient relatively too small @@ -111,14 +103,12 @@ private[mllib] object NNLSbyPCG { SimpleBlas.copy(res, grad) // project the gradient - if (nonnegative) { - i = 0 - while (i < n) { - if (grad.data(i) > 0.0 && x.data(i) == 0.0) { - grad.data(i) = 0.0 - } - i = i + 1 + i = 0 + while (i < n) { + if (grad.data(i) > 0.0 && x.data(i) == 0.0) { + grad.data(i) = 0.0 } + i = i + 1 } val ngrad = SimpleBlas.dot(grad, grad) @@ -150,24 +140,20 @@ private[mllib] object NNLSbyPCG { } // don't run through the walls - if (nonnegative) { - i = 0 - while (i < n) { - if (step * dir.data(i) > x.data(i)) { - step = Math.min(step, x.data(i) / dir.data(i)) - } - i = i + 1 + i = 0 + while (i < n) { + if (step * dir.data(i) > x.data(i)) { + step = x.data(i) / dir.data(i) } + i = i + 1 } // take the step i = 0 while (i < n) { - if (nonnegative) { - if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { - x.data(i) = 0 - lastWall = iterno - } else x.data(i) -= step * dir.data(i) + if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) { + x.data(i) = 0 + lastWall = iterno } else { x.data(i) -= step * dir.data(i) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 29a7c003c4172..3901372262022 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -33,7 +33,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.SparkContext._ import org.apache.spark.util.Utils -import org.apache.spark.mllib.optimization.NNLSbyPCG +import org.apache.spark.mllib.optimization.NNLS /** * Out-link information for a user or product block. This includes the original user/product IDs @@ -504,7 +504,7 @@ class ALS private ( } } - val ws = NNLSbyPCG.createWorkspace(rank) + val ws = if (nonnegative) NNLS.createWorkspace(rank) else null // Solve the least-squares problem for each user and return the new feature vectors Array.range(0, numUsers).map { index => @@ -526,11 +526,11 @@ class ALS private ( * to nonnegativity constraints if `nonnegative` is true. */ def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix, - ws: NNLSbyPCG.Workspace): Array[Double] = { + ws: NNLS.Workspace): Array[Double] = { if (!nonnegative) { Solve.solvePositive(ata, atb).data } else { - NNLSbyPCG.solve(ata, atb, true, ws) + NNLS.solve(ata, atb, ws) } } @@ -566,7 +566,7 @@ object ALS { * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the * product of two lower-rank matrices of a given rank (number of features). To solve for these * features, we run a given number of iterations of ALS. This is done using a level of - * parallelism given by `blocks`, partitioning the data using the Partitioner `partitioner`. + * parallelism given by `blocks`. * * @param ratings RDD of (userID, productID, rating) pairs * @param rank number of features to use @@ -574,7 +574,7 @@ object ALS { * @param lambda regularization factor (recommended: 0.01) * @param blocks level of parallelism to split computation into * @param seed random seed - * @param nonnegative Whether to impose nonnegativity constraints + * @param nonnegative whether to impose nonnegativity constraints */ def train( ratings: RDD[Rating], @@ -585,7 +585,7 @@ object ALS { seed: Long, nonnegative: Boolean) = { val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed) - if (nonnegative) als.setNonnegative(true) + als.setNonnegative(nonnegative) als.run(ratings) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala index 6216d51bd006b..07a01b61b5b59 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -43,20 +43,20 @@ class NNLSSuite extends FunSuite { (ata, atb) } - test("NNLSbyPCG: exact solution cases") { + test("NNLS: exact solution cases") { val n = 20 val rand = new Random(12346) - val ws = NNLSbyPCG.createWorkspace(n) + val ws = NNLS.createWorkspace(n) var numSolved = 0 - // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLSbyPCG + // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLS // can legitimately fail to solve these anywhere close to exactly. So we grab a considerable // sample of these matrices and make sure that we solved a substantial fraction of them. for (kase <- 0 until 100) { val (ata, atb) = genOnesData(n, rand) - val x = NNLSbyPCG.solve(ata, atb, true, ws) - assert(x.length == n) + val x = NNLS.solve(ata, atb, ws) + assert(x.length === n) var error = 0.0 var solved = true for (i <- 0 until n) { @@ -66,12 +66,11 @@ class NNLSSuite extends FunSuite { if (error > 1e-2) solved = false if (solved) numSolved = numSolved + 1 } - println(numSolved) assert(numSolved > 50) } - test("NNLSbyPCG: nonnegativity constraint active") { + test("NNLS: nonnegativity constraint active") { val n = 5 val M = Array( Array( 4.377, -3.531, -1.306, -0.139, 3.418, -1.632), @@ -86,10 +85,11 @@ class NNLSSuite extends FunSuite { val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628) - val ws = NNLSbyPCG.createWorkspace(n) - val x = NNLSbyPCG.solve(ata, atb, true, ws) + val ws = NNLS.createWorkspace(n) + val x = NNLS.solve(ata, atb, ws) for (i <- 0 until n) { assert(Math.abs(x(i) - goodx(i)) < 1e-3) + assert(x(i) >= 0) } } } From 65ef7f263db217b3265e9a7ccb81bbd0930d1af9 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Fri, 25 Apr 2014 12:41:26 -0400 Subject: [PATCH 16/18] Make ALS's ctor public and remove a couple of "convenience" wrappers. --- .../spark/mllib/recommendation/ALS.scala | 62 +------------------ .../spark/mllib/recommendation/ALSSuite.scala | 9 +-- 2 files changed, 4 insertions(+), 67 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 3901372262022..4f10dacf48c86 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -93,7 +93,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * indicated user * preferences rather than explicit ratings given to items. */ -class ALS private ( +class ALS ( private var numBlocks: Int, private var rank: Int, private var iterations: Int, @@ -560,35 +560,6 @@ class ALS private ( * Top-level methods for calling Alternating Least Squares (ALS) matrix factorization. */ object ALS { - - /** - * Train a matrix factorization model given an RDD of ratings given by users to some products, - * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the - * product of two lower-rank matrices of a given rank (number of features). To solve for these - * features, we run a given number of iterations of ALS. This is done using a level of - * parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param seed random seed - * @param nonnegative whether to impose nonnegativity constraints - */ - def train( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - seed: Long, - nonnegative: Boolean) = { - val als = new ALS(blocks, rank, iterations, lambda, false, 1.0, seed) - als.setNonnegative(nonnegative) - als.run(ratings) - } - /** * Train a matrix factorization model given an RDD of ratings given by users to some products, * in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the @@ -670,37 +641,6 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } - /** - * Train a matrix factorization model given an RDD of 'implicit preferences' given by users - * to some products, in the form of (userID, productID, preference) pairs. We approximate the - * ratings matrix as the product of two lower-rank matrices of a given rank (number of features). - * To solve for these features, we run a given number of iterations of ALS. This is done using - * a level of parallelism given by `blocks`. - * - * @param ratings RDD of (userID, productID, rating) pairs - * @param rank number of features to use - * @param iterations number of iterations of ALS (recommended: 10-20) - * @param lambda regularization factor (recommended: 0.01) - * @param blocks level of parallelism to split computation into - * @param alpha confidence parameter (only applies when immplicitPrefs = true) - * @param seed random seed - * @param nonnegative Whether to impose nonnegativity upon the user and product factors - */ - def trainImplicit( - ratings: RDD[Rating], - rank: Int, - iterations: Int, - lambda: Double, - blocks: Int, - alpha: Double, - seed: Long, - nonnegative: Boolean - ): MatrixFactorizationModel = { - new ALS(blocks, rank, iterations, lambda, true, alpha, seed) - .setNonnegative(nonnegative) - .run(ratings) - } - /** * Train a matrix factorization model given an RDD of 'implicit preferences' given by users * to some products, in the form of (userID, productID, preference) pairs. We approximate the diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 21466b8ba23ba..d8f2e6f34bec6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -179,12 +179,9 @@ class ALSSuite extends FunSuite with LocalSparkContext { { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights, negativeFactors) - val model = implicitPrefs match { - case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 0L, !negativeFactors) - case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, - numBlocks, 1.0, 0L, !negativeFactors) - } + + val model = (new ALS(numBlocks, features, iterations, 0.01, implicitPrefs, 1.0).setSeed(0L) + .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings))) val predictedU = new DoubleMatrix(users, features) for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) { From 7fbabf1326085ffb2f58c885b7ca936d0a5ee27e Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Fri, 25 Apr 2014 12:56:12 -0400 Subject: [PATCH 17/18] Cleanup matrix math in NNLSSuite. --- .../spark/mllib/optimization/NNLSSuite.scala | 49 +++++++------------ 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala index 07a01b61b5b59..bbf385229081a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala @@ -24,21 +24,13 @@ import org.scalatest.FunSuite import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas} class NNLSSuite extends FunSuite { - /** Generate a NNLS problem whose optimal solution is the all-ones vector. */ + /** Generate an NNLS problem whose optimal solution is the all-ones vector. */ def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = { - val A = new DoubleMatrix(n, n) - val b = new DoubleMatrix(n, 1) - for (i <- 0 until n; j <- 0 until n) { - val aij = rand.nextDouble() - A.put(i, j, aij) - b.put(i, b.get(i, 0) + aij) - } - - val ata = new DoubleMatrix(n, n) - val atb = new DoubleMatrix(n, 1) + val A = new DoubleMatrix(n, n, Array.fill(n*n)(rand.nextDouble()): _*) + val b = A.mmul(DoubleMatrix.ones(n, 1)) - NativeBlas.dgemm('T', 'N', n, n, n, 1.0, A.data, 0, n, A.data, 0, n, 0.0, ata.data, 0, n) - NativeBlas.dgemv('T', n, n, 1.0, A.data, 0, n, b.data, 0, 1, 0.0, atb.data, 0, 1) + val ata = A.transpose.mmul(A) + val atb = A.transpose.mmul(b) (ata, atb) } @@ -53,17 +45,13 @@ class NNLSSuite extends FunSuite { // can legitimately fail to solve these anywhere close to exactly. So we grab a considerable // sample of these matrices and make sure that we solved a substantial fraction of them. - for (kase <- 0 until 100) { + for (k <- 0 until 100) { val (ata, atb) = genOnesData(n, rand) - val x = NNLS.solve(ata, atb, ws) + val x = new DoubleMatrix(NNLS.solve(ata, atb, ws)) assert(x.length === n) - var error = 0.0 - var solved = true - for (i <- 0 until n) { - error = error + (x(i) - 1) * (x(i) - 1) - if (Math.abs(x(i) - 1) > 1e-3) solved = false - } - if (error > 1e-2) solved = false + val answer = DoubleMatrix.ones(n, 1) + SimpleBlas.axpy(-1.0, answer, x) + val solved = (x.norm2 < 1e-2) && (x.normmax < 1e-3) if (solved) numSolved = numSolved + 1 } @@ -72,16 +60,13 @@ class NNLSSuite extends FunSuite { test("NNLS: nonnegativity constraint active") { val n = 5 - val M = Array( - Array( 4.377, -3.531, -1.306, -0.139, 3.418, -1.632), - Array(-3.531, 4.344, 0.934, 0.305, -2.140, 2.115), - Array(-1.306, 0.934, 2.644, -0.203, -0.170, 1.094), - Array(-0.139, 0.305, -0.203, 5.883, 1.428, -1.025), - Array( 3.418, -2.140, -0.170, 1.428, 4.684, -0.636)) - val ata = new DoubleMatrix(n, n) - val atb = new DoubleMatrix(n, 1) - for (i <- 0 until n; j <- 0 until n) ata.put(i, j, M(i)(j)) - for (i <- 0 until n) atb.put(i, M(i)(n)) + val ata = new DoubleMatrix(Array( + Array( 4.377, -3.531, -1.306, -0.139, 3.418), + Array(-3.531, 4.344, 0.934, 0.305, -2.140), + Array(-1.306, 0.934, 2.644, -0.203, -0.170), + Array(-0.139, 0.305, -0.203, 5.883, 1.428), + Array( 3.418, -2.140, -0.170, 1.428, 4.684))) + val atb = new DoubleMatrix(Array(-1.632, 2.115, 1.094, -1.025, -0.636)) val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628) From 199b0bc21e480a726ed9dd682ff5115e6591cdad Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Fri, 25 Apr 2014 15:24:24 -0400 Subject: [PATCH 18/18] Make the ctor private again and use the builder pattern. --- .../main/scala/org/apache/spark/mllib/recommendation/ALS.scala | 2 +- .../scala/org/apache/spark/mllib/recommendation/ALSSuite.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 4f10dacf48c86..c2b2efd3c238b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -93,7 +93,7 @@ case class Rating(val user: Int, val product: Int, val rating: Double) * indicated user * preferences rather than explicit ratings given to items. */ -class ALS ( +class ALS private ( private var numBlocks: Int, private var rank: Int, private var iterations: Int, diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index d8f2e6f34bec6..a887524e3e19d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -180,7 +180,8 @@ class ALSSuite extends FunSuite with LocalSparkContext { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights, negativeFactors) - val model = (new ALS(numBlocks, features, iterations, 0.01, implicitPrefs, 1.0).setSeed(0L) + val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations) + .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L) .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings))) val predictedU = new DoubleMatrix(users, features)