Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
/**
* Representing a normal equation to solve the following weighted least squares problem:
*
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x.
* minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x.
*
* Its normal equation is given by
*
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0.
* \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0.
*
* Distributing and letting b,,i,, = c,,i,, * d,,i,,
*
* \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0.
*/
private[recommendation] class NormalEquation(val k: Int) extends Serializable {

Expand Down Expand Up @@ -796,7 +800,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
copyToDouble(a)
blas.dspr(upper, k, c, da, 1, ata)
if (b != 0.0) {
blas.daxpy(k, c * b, da, 1, atb, 1)
blas.daxpy(k, b, da, 1, atb, 1)
}
this
}
Expand Down Expand Up @@ -1624,15 +1628,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val srcFactor = sortedSrcFactors(blockId)(localIndex)
val rating = ratings(i)
if (implicitPrefs) {
// Extension to the original paper to handle b < 0. confidence is a function of |b|
// instead so that it is never negative. c1 is confidence - 1.0.
// Extension to the original paper to handle rating < 0. confidence is a function
// of |rating| instead so that it is never negative. c1 is confidence - 1.
val c1 = alpha * math.abs(rating)
// For rating <= 0, the corresponding preference is 0. So the term below is only added
// for rating > 0. Because YtY is already added, we need to adjust the scaling here.
if (rating > 0) {
// For rating <= 0, the corresponding preference is 0. So the second argument of add
// is only there for rating > 0.
if (rating > 0.0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
}
ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the substance of the change? I might need some help understanding why this is needed. Yes, even negative values should be recorded for implicit prefs, I agree. It adds 1 + c1 now instead of (1 + c1) / c1, so that's why the factor of c is taken out above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this is the crux of the change (moving outside of the if condition). Changing the arguments was more to be less confusing and more direct, since it was very confusing to me before where the (1+c1)/c1 was coming from and then when it is actually used in add, it gets multiplied by c1, which is a wasted operation and may not even exactly yield 1+c1 in the end.

} else {
ls.add(srcFactor, rating)
numExplicits += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
Expand Down Expand Up @@ -78,7 +79,7 @@ class ALSSuite
val k = 2
val ne0 = new NormalEquation(k)
.add(Array(1.0f, 2.0f), 3.0)
.add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted
.add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this test change intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this test change was intentional, because I change the semantic meaning of the arguments to add, before add would multiply the second and third arguments together internally, so to make this test valid I premultiplied them together. In the usage of this function in ALS.scala, for non-implicit the third argument is 1, so there is no change, and implicit is now handled correctly.

assert(ne0.k === k)
assert(ne0.triK === k * (k + 1) / 2)
// NumPy code that computes the expected values:
Expand Down Expand Up @@ -347,6 +348,37 @@ class ALSSuite
ALSSuite.genFactors(size, rank, random, a, b)
}

/**
* Train ALS using the given training set and parameters
* @param training training dataset
* @param rank rank of the matrix factorization
* @param maxIter max number of iterations
* @param regParam regularization constant
* @param implicitPrefs whether to use implicit preference
* @param numUserBlocks number of user blocks
* @param numItemBlocks number of item blocks
* @return a trained ALSModel
*/
def trainALS(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a new overload?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a helper function, because I call it twice in the test. I also wanted to use this in the testALS function, but it wasn't straightforward. I can't use testALS in my test since it does more than just train the model and it doesn't allow me to compare the two models the test generates, one with negative values and one with those negative values zeroed out.

training: RDD[Rating[Int]],
rank: Int,
maxIter: Int,
regParam: Double,
implicitPrefs: Boolean = false,
numUserBlocks: Int = 2,
numItemBlocks: Int = 3): ALSModel = {
val spark = this.spark
import spark.implicits._
val als = new ALS()
.setRank(rank)
.setRegParam(regParam)
.setImplicitPrefs(implicitPrefs)
.setNumUserBlocks(numUserBlocks)
.setNumItemBlocks(numItemBlocks)
.setSeed(0)
als.fit(training.toDF())
}

/**
* Test ALS using the given training/test splits and parameters.
* @param training training dataset
Expand Down Expand Up @@ -455,6 +487,22 @@ class ALSSuite
targetRMSE = 0.3)
}

test("implicit feedback regression") {
val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, -3)))
val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, 0)))
val modelWithNeg =
trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true)
val modelWithZero =
trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true)
val userFactorsNeg = modelWithNeg.userFactors
val itemFactorsNeg = modelWithNeg.itemFactors
val userFactorsZero = modelWithZero.userFactors
val itemFactorsZero = modelWithZero.itemFactors
userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" ")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit here but ideally we don't usually log info during this sort of test?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I meant to remove, shall I open another pr?

userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" ")))
assert(userFactorsNeg.intersect(userFactorsZero).count() == 0)
assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0)
}
test("using generic ID types") {
val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01)

Expand Down