From e3082faddbf2e1449310e1774f43dd44f0b32683 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 30 Apr 2014 12:53:46 +0530 Subject: [PATCH 1/7] SPARK-1668: Add implicit preference as an option to examples/MovieLensALS Add --implicitPrefs as an command-line option to the example app MovieLensALS under examples/ --- .../org/apache/spark/examples/mllib/MovieLensALS.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 703f02255b94b..5e57bf970f00d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -43,7 +43,8 @@ object MovieLensALS { kryo: Boolean = false, numIterations: Int = 20, lambda: Double = 1.0, - rank: Int = 10) + rank: Int = 10, + implicitPrefs: Boolean = false) def main(args: Array[String]) { val defaultParams = Params() @@ -62,6 +63,9 @@ object MovieLensALS { opt[Unit]("kryo") .text(s"use Kryo serialization") .action((_, c) => c.copy(kryo = true)) + opt[Unit]("implicitPrefs") + .text(s"use Implicit Preference") + .action((_, c) => c.copy(implicitPrefs = true)) arg[String]("") .required() .text("input paths to a MovieLens dataset of ratings") @@ -111,6 +115,7 @@ object MovieLensALS { .setRank(params.rank) .setIterations(params.numIterations) .setLambda(params.lambda) + .setImplicitPrefs(params.implicitPrefs) .run(training) val rmse = computeRmse(model, test, numTest) From 42444d738d3270c00306347a3f6f3987cb6a70cc Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 30 Apr 2014 15:52:37 +0530 Subject: [PATCH 2/7] Based on Suggestions by mengxr --- .../org/apache/spark/examples/mllib/MovieLensALS.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 5e57bf970f00d..d8788014c3177 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -92,7 +92,11 @@ object MovieLensALS { val ratings = sc.textFile(params.input).map { line => val fields = line.split("::") - Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + if (params.implicitPrefs) { + Rating(fields(0).toInt, fields(1).toInt, if (fields(2).toInt >= 3) 1.0 else 0.0) + } else { + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) + } }.cache() val numRatings = ratings.count() From 1dd7657d92ec361cb377e26eb87d9e88cadd65e1 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 12:17:42 +0530 Subject: [PATCH 3/7] use mean() --- .../scala/org/apache/spark/examples/mllib/MovieLensALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index d8788014c3177..75e49b2f9e57b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -135,6 +135,6 @@ object MovieLensALS { val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) .join(data.map(x => ((x.user, x.product), x.rating))) .values - math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) + math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) } } From 5149d40659545856e868e511a52d9b64099df033 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 12:50:03 +0530 Subject: [PATCH 4/7] Changes based on review --- .../scala/org/apache/spark/examples/mllib/MovieLensALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 75e49b2f9e57b..000f47b9ed63f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -93,7 +93,7 @@ object MovieLensALS { val ratings = sc.textFile(params.input).map { line => val fields = line.split("::") if (params.implicitPrefs) { - Rating(fields(0).toInt, fields(1).toInt, if (fields(2).toInt >= 3) 1.0 else 0.0) + Rating(fields(0).toInt, fields(1).toInt, if (fields(2).toDouble >= 2.5) 1.0 else 0.0) } else { Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } From 937e54caf7a8483ca0e81f877522b96df7f4c373 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 14:28:42 +0530 Subject: [PATCH 5/7] Changes --- .../spark/examples/mllib/MovieLensALS.scala | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 000f47b9ed63f..023481133d74d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -93,7 +93,7 @@ object MovieLensALS { val ratings = sc.textFile(params.input).map { line => val fields = line.split("::") if (params.implicitPrefs) { - Rating(fields(0).toInt, fields(1).toInt, if (fields(2).toDouble >= 2.5) 1.0 else 0.0) + Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) } else { Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) } @@ -107,7 +107,12 @@ object MovieLensALS { val splits = ratings.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() - val test = splits(1).cache() + val test = if (params.implicitPrefs) { + splits(1) + .map(x => Rating(x.user, x.product, if(x.rating >= 0) 1.0 else 0.0)) + } else { + splits(1) + }.cache() val numTraining = training.count() val numTest = test.count() @@ -122,7 +127,7 @@ object MovieLensALS { .setImplicitPrefs(params.implicitPrefs) .run(training) - val rmse = computeRmse(model, test, numTest) + val rmse = computeRmse(model, test, params) println(s"Test RMSE = $rmse.") @@ -130,11 +135,17 @@ object MovieLensALS { } /** Compute RMSE (Root Mean Squared Error). */ - def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long) = { + def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], params: Params) = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) - val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) - .join(data.map(x => ((x.user, x.product), x.rating))) - .values - math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) + val predictionsAndRatings = if (params.implicitPrefs) { + predictions.map(x => ( + (x.user, x.product), + if (x.rating > 1.0) 1.0 else if (x.rating < 0.0) 0.0 else x.rating + )).join(data.map(x => ((x.user, x.product), x.rating))) + } else { + predictions.map(x => ((x.user, x.product), x.rating)) + .join(data.map(x => ((x.user, x.product), x.rating))) + } + math.sqrt(predictionsAndRatings.values.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) } } From eca9d37a2f2cb261a12e4c550b89a283803897c2 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 7 May 2014 12:32:47 +0530 Subject: [PATCH 6/7] based on mengxr's suggestions --- .../spark/examples/mllib/MovieLensALS.scala | 49 +++++++++++++------ 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 023481133d74d..f959791e10fd6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -64,7 +64,7 @@ object MovieLensALS { .text(s"use Kryo serialization") .action((_, c) => c.copy(kryo = true)) opt[Unit]("implicitPrefs") - .text(s"use Implicit Preference") + .text(s"use implicit preference") .action((_, c) => c.copy(implicitPrefs = true)) arg[String]("") .required() @@ -93,6 +93,22 @@ object MovieLensALS { val ratings = sc.textFile(params.input).map { line => val fields = line.split("::") if (params.implicitPrefs) { + /** + * MovieLens ratings are on a scale of 1-5: + * 5: Must see + * 4: Will enjoy + * 3: It's okay + * 2: Fairly bad + * 1: Awful + * So we should not recommend a movie if the predicted rating is less than 3. + * To map ratings to confidence scores, we use + * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved + * entries are generally between It's okay and Fairly bad. + * The semantics of 0 in this expanded world of non-positive weights + * are "the same as never having interacted at all" + * It's possible that 0 values are ignored when constructing the sparse representation, + * because the 0s are implicit. This would be a problem, at least, a theoretical one. + */ Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) } else { Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) @@ -108,8 +124,14 @@ object MovieLensALS { val splits = ratings.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() val test = if (params.implicitPrefs) { - splits(1) - .map(x => Rating(x.user, x.product, if(x.rating >= 0) 1.0 else 0.0)) + /** + * 0 means "don't know" and positive values mean "confident that the prediction should be 1". + * Negative values means "confident that the prediction should be 0". + * We have in this case used some kind of weighted RMSE. The weight is the absolute value of + * the confidence. The error is the difference between prediction and either 1 or 0, + * depending on whether r is positive or negative. + */ + splits(1).map(x => Rating(x.user, x.product, if(x.rating > 0) 1.0 else 0.0)) } else { splits(1) }.cache() @@ -127,7 +149,7 @@ object MovieLensALS { .setImplicitPrefs(params.implicitPrefs) .run(training) - val rmse = computeRmse(model, test, params) + val rmse = computeRmse(model, test, params.implicitPrefs) println(s"Test RMSE = $rmse.") @@ -135,17 +157,14 @@ object MovieLensALS { } /** Compute RMSE (Root Mean Squared Error). */ - def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], params: Params) = { + def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean) = { + + def evalRating(r: Double) = + if (!implicitPrefs) r else if (r > 1.0) 1.0 else if (r < 0.0) 0.0 else r + val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) - val predictionsAndRatings = if (params.implicitPrefs) { - predictions.map(x => ( - (x.user, x.product), - if (x.rating > 1.0) 1.0 else if (x.rating < 0.0) 0.0 else x.rating - )).join(data.map(x => ((x.user, x.product), x.rating))) - } else { - predictions.map(x => ((x.user, x.product), x.rating)) - .join(data.map(x => ((x.user, x.product), x.rating))) - } - math.sqrt(predictionsAndRatings.values.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) + val predictionsAndRatings = predictions.map(x => ((x.user, x.product), evalRating(x.rating))) + .join(data.map(x => ((x.user, x.product), x.rating))).values + math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) } } From 8b371dc115077151003bf3fa06a2e6bcd08434fd Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 7 May 2014 22:43:06 +0530 Subject: [PATCH 7/7] Second Pass on reviews by mengxr --- .../spark/examples/mllib/MovieLensALS.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index f959791e10fd6..0e4447e0de24f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -64,7 +64,7 @@ object MovieLensALS { .text(s"use Kryo serialization") .action((_, c) => c.copy(kryo = true)) opt[Unit]("implicitPrefs") - .text(s"use implicit preference") + .text("use implicit preference") .action((_, c) => c.copy(implicitPrefs = true)) arg[String]("") .required() @@ -93,7 +93,7 @@ object MovieLensALS { val ratings = sc.textFile(params.input).map { line => val fields = line.split("::") if (params.implicitPrefs) { - /** + /* * MovieLens ratings are on a scale of 1-5: * 5: Must see * 4: Will enjoy @@ -105,9 +105,7 @@ object MovieLensALS { * 5 -> 2.5, 4 -> 1.5, 3 -> 0.5, 2 -> -0.5, 1 -> -1.5. This mappings means unobserved * entries are generally between It's okay and Fairly bad. * The semantics of 0 in this expanded world of non-positive weights - * are "the same as never having interacted at all" - * It's possible that 0 values are ignored when constructing the sparse representation, - * because the 0s are implicit. This would be a problem, at least, a theoretical one. + * are "the same as never having interacted at all". */ Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) } else { @@ -124,14 +122,14 @@ object MovieLensALS { val splits = ratings.randomSplit(Array(0.8, 0.2)) val training = splits(0).cache() val test = if (params.implicitPrefs) { - /** + /* * 0 means "don't know" and positive values mean "confident that the prediction should be 1". * Negative values means "confident that the prediction should be 0". * We have in this case used some kind of weighted RMSE. The weight is the absolute value of * the confidence. The error is the difference between prediction and either 1 or 0, * depending on whether r is positive or negative. */ - splits(1).map(x => Rating(x.user, x.product, if(x.rating > 0) 1.0 else 0.0)) + splits(1).map(x => Rating(x.user, x.product, if (x.rating > 0) 1.0 else 0.0)) } else { splits(1) }.cache() @@ -159,12 +157,12 @@ object MovieLensALS { /** Compute RMSE (Root Mean Squared Error). */ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean) = { - def evalRating(r: Double) = - if (!implicitPrefs) r else if (r > 1.0) 1.0 else if (r < 0.0) 0.0 else r + def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) - val predictionsAndRatings = predictions.map(x => ((x.user, x.product), evalRating(x.rating))) - .join(data.map(x => ((x.user, x.product), x.rating))).values + val predictionsAndRatings = predictions.map{ x => + ((x.user, x.product), mapPredictedRating(x.rating)) + }.join(data.map(x => ((x.user, x.product), x.rating))).values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) } }