Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.optimization

import org.jblas.{DoubleMatrix, SimpleBlas}

import org.apache.spark.annotation.DeveloperApi

/**
* Object used to solve nonnegative least squares problems using a modified
* projected gradient method.
*/
private[mllib] object NNLS {
class Workspace(val n: Int) {
val scratch = new DoubleMatrix(n, 1)
val grad = new DoubleMatrix(n, 1)
val x = new DoubleMatrix(n, 1)
val dir = new DoubleMatrix(n, 1)
val lastDir = new DoubleMatrix(n, 1)
val res = new DoubleMatrix(n, 1)

def wipe() {
scratch.fill(0.0)
grad.fill(0.0)
x.fill(0.0)
dir.fill(0.0)
lastDir.fill(0.0)
res.fill(0.0)
}
}

def createWorkspace(n: Int): Workspace = {
new Workspace(n)
}

/**
* Solve a least squares problem, possibly with nonnegativity constraints, by a modified
* projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b.
*
* We solve the problem
* min_x 1/2 x^T ata x^T - x^T atb
* subject to x >= 0
*
* The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient
* method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound-
* constrained nonlinear programming. Polyak unconditionally uses a conjugate gradient
* direction, however, while this method only uses a conjugate gradient direction if the last
* iteration did not cause a previously-inactive constraint to become active.
*/
def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = {
ws.wipe()

val n = atb.rows
val scratch = ws.scratch

// find the optimal unconstrained step
def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = {
val top = SimpleBlas.dot(dir, res)
SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
// Push the denominator upward very slightly to avoid infinities and silliness
top / (SimpleBlas.dot(scratch, dir) + 1e-20)
}

// stopping condition
def stop(step: Double, ndir: Double, nx: Double): Boolean = {
((step.isNaN) // NaN
|| (step < 1e-6) // too small or negative
|| (step > 1e40) // too small; almost certainly numerical problems
|| (ndir < 1e-12 * nx) // gradient relatively too small
|| (ndir < 1e-32) // gradient absolutely too small; numerical issues may lurk
)
}

val grad = ws.grad
val x = ws.x
val dir = ws.dir
val lastDir = ws.lastDir
val res = ws.res
val iterMax = Math.max(400, 20 * n)
var lastNorm = 0.0
var iterno = 0
var lastWall = 0 // Last iteration when we hit a bound constraint.
var i = 0
while (iterno < iterMax) {
// find the residual
SimpleBlas.gemv(1.0, ata, x, 0.0, res)
SimpleBlas.axpy(-1.0, atb, res)
SimpleBlas.copy(res, grad)

// project the gradient
i = 0
while (i < n) {
if (grad.data(i) > 0.0 && x.data(i) == 0.0) {
grad.data(i) = 0.0
}
i = i + 1
}
val ngrad = SimpleBlas.dot(grad, grad)

SimpleBlas.copy(grad, dir)

// use a CG direction under certain conditions
var step = steplen(grad, res)
var ndir = 0.0
val nx = SimpleBlas.dot(x, x)
if (iterno > lastWall + 1) {
val alpha = ngrad / lastNorm
SimpleBlas.axpy(alpha, lastDir, dir)
val dstep = steplen(dir, res)
ndir = SimpleBlas.dot(dir, dir)
if (stop(dstep, ndir, nx)) {
// reject the CG step if it could lead to premature termination
SimpleBlas.copy(grad, dir)
ndir = SimpleBlas.dot(dir, dir)
} else {
step = dstep
}
} else {
ndir = SimpleBlas.dot(dir, dir)
}

// terminate?
if (stop(step, ndir, nx)) {
return x.data.clone
}

// don't run through the walls
i = 0
while (i < n) {
if (step * dir.data(i) > x.data(i)) {
step = x.data(i) / dir.data(i)
}
i = i + 1
}

// take the step
i = 0
while (i < n) {
if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
x.data(i) = 0
lastWall = iterno
} else {
x.data(i) -= step * dir.data(i)
}
i = i + 1
}

iterno = iterno + 1
SimpleBlas.copy(dir, lastDir)
lastNorm = ngrad
}
x.data.clone
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.SparkContext._
import org.apache.spark.util.Utils
import org.apache.spark.mllib.optimization.NNLS

/**
* Out-link information for a user or product block. This includes the original user/product IDs
Expand Down Expand Up @@ -158,6 +159,18 @@ class ALS private (
this
}

/** If true, do alternating nonnegative least squares. */
private var nonnegative = false

/**
* Set whether the least-squares problems solved at each iteration should have
* nonnegativity constraints.
*/
def setNonnegative(b: Boolean): ALS = {
this.nonnegative = b
this
}

/**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
Expand Down Expand Up @@ -499,6 +512,8 @@ class ALS private (
}
}

val ws = if (nonnegative) NNLS.createWorkspace(rank) else null

// Solve the least-squares problem for each user and return the new feature vectors
Array.range(0, numUsers).map { index =>
// Compute the full XtX matrix from the lower-triangular part we got above
Expand All @@ -507,13 +522,26 @@ class ALS private (
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
// Solve the resulting matrix, which is symmetric and positive-definite
if (implicitPrefs) {
Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws)
} else {
Solve.solvePositive(fullXtX, userXy(index)).data
solveLeastSquares(fullXtX, userXy(index), ws)
}
}
}

/**
* Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
* to nonnegativity constraints if `nonnegative` is true.
*/
def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix,
ws: NNLS.Workspace): Array[Double] = {
if (!nonnegative) {
Solve.solvePositive(ata, atb).data
} else {
NNLS.solve(ata, atb, ws)
}
}

/**
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
* matrix that it represents, storing it into destMatrix.
Expand All @@ -540,7 +568,6 @@ class ALS private (
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
*/
object ALS {

/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.optimization

import scala.util.Random

import org.scalatest.FunSuite

import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas}

class NNLSSuite extends FunSuite {
/** Generate an NNLS problem whose optimal solution is the all-ones vector. */
def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = {
val A = new DoubleMatrix(n, n, Array.fill(n*n)(rand.nextDouble()): _*)
val b = A.mmul(DoubleMatrix.ones(n, 1))

val ata = A.transpose.mmul(A)
val atb = A.transpose.mmul(b)

(ata, atb)
}

test("NNLS: exact solution cases") {
val n = 20
val rand = new Random(12346)
val ws = NNLS.createWorkspace(n)
var numSolved = 0

// About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLS
// can legitimately fail to solve these anywhere close to exactly. So we grab a considerable
// sample of these matrices and make sure that we solved a substantial fraction of them.

for (k <- 0 until 100) {
val (ata, atb) = genOnesData(n, rand)
val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
assert(x.length === n)
val answer = DoubleMatrix.ones(n, 1)
SimpleBlas.axpy(-1.0, answer, x)
val solved = (x.norm2 < 1e-2) && (x.normmax < 1e-3)
if (solved) numSolved = numSolved + 1
}

assert(numSolved > 50)
}

test("NNLS: nonnegativity constraint active") {
val n = 5
val ata = new DoubleMatrix(Array(
Array( 4.377, -3.531, -1.306, -0.139, 3.418),
Array(-3.531, 4.344, 0.934, 0.305, -2.140),
Array(-1.306, 0.934, 2.644, -0.203, -0.170),
Array(-0.139, 0.305, -0.203, 5.883, 1.428),
Array( 3.418, -2.140, -0.170, 1.428, 4.684)))
val atb = new DoubleMatrix(Array(-1.632, 2.115, 1.094, -1.025, -0.636))

val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628)

val ws = NNLS.createWorkspace(n)
val x = NNLS.solve(ata, atb, ws)
for (i <- 0 until n) {
assert(Math.abs(x(i) - goodx(i)) < 1e-3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to assert x(i) >= 0.

assert(x(i) >= 0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ object ALSSuite {
features: Int,
samplingRate: Double,
implicitPrefs: Boolean = false,
negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
negativeWeights: Boolean = false,
negativeFactors: Boolean = true): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
val rand = new Random(42)

// Create a random matrix with uniform values from -1 to 1
def randomMatrix(m: Int, n: Int) =
new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
def randomMatrix(m: Int, n: Int) = {
if (negativeFactors) {
new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
} else {
new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble()): _*)
}
}

val userMatrix = randomMatrix(users, features)
val productMatrix = randomMatrix(features, products)
Expand Down Expand Up @@ -147,6 +153,10 @@ class ALSSuite extends FunSuite with LocalSparkContext {
}
}

test("NNALS, rank 2") {
testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false)
}

/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
Expand All @@ -160,19 +170,19 @@ class ALSSuite extends FunSuite with LocalSparkContext {
* @param bulkPredict flag to test bulk prediciton
* @param negativeWeights whether the generated data can contain negative values
* @param numBlocks number of blocks to partition users and products into
* @param negativeFactors whether the generated user/product factors can have negative entries
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1)
bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
negativeFactors: Boolean = true)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
features, samplingRate, implicitPrefs, negativeWeights)
val model = implicitPrefs match {
case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 0L)
case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01,
numBlocks, 1.0, 0L)
}
features, samplingRate, implicitPrefs, negativeWeights, negativeFactors)

val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations)
.setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L)
.setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings)))

val predictedU = new DoubleMatrix(users, features)
for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
Expand Down