From 412b1b5f24500710c91184e97e09b3b0e1830273 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Fri, 15 Apr 2016 14:09:16 +0200 Subject: [PATCH 1/7] initial work on adding 'recommendAll' methods to ML ALS --- .../apache/spark/ml/recommendation/ALS.scala | 109 ++++++++++++++++++ .../MatrixFactorizationModel.scala | 89 +++++++++----- .../spark/ml/recommendation/ALSSuite.scala | 49 ++++++++ 3 files changed, 220 insertions(+), 27 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 02e2384afe53..41bae53b8f60 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -90,6 +90,31 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo n.toInt } } + + /** + * Param for number of recommendations to make. If set, calling [[ALSModel.transform()]] will + * generate the top `k` recommendations for each user or item, depending on the value of + * [[recommendFor]]. + * @group param + */ + val k = new Param[Int](this, "k", "number of recommendations to make. If set, calling " + + "'transform` will generate the top 'k' recommended items for each user or item, depending on" + + " the value of 'recommendFor'.", ParamValidators.gt(0)) + + /** @group getParam */ + def getK: Int = $(k) + + /** + * Param for whether to generate recommendations for users or items. + * Supported values: `user`, `item`. If set, [[k]] must also be set. + * @group param + */ + val recommendFor = new Param[String](this, "recommendFor", "whether to generate " + + "recommendations for users or items. Supported values: 'user', 'item'. " + + "If set, 'k' must also be set.", ParamValidators.inArray(Array("user", "item"))) + + /** @group getParam */ + def getRecommendFor: String = $(recommendFor) } /** @@ -248,7 +273,15 @@ class ALSModel private[ml] ( @Since("1.3.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group setParam */ + @Since("2.0.0") + def setK(value: Int): this.type = set(k, value) + + /** @group setParam */ @Since("2.0.0") + def setRecommendFor(value: String): this.type = set(recommendFor, value) + + @Since("1.3.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema) // Register a UDF for DataFrame, and then @@ -269,6 +302,55 @@ class ALSModel private[ml] ( predict(userFactors("features"), itemFactors("features")).as($(predictionCol))) } + /** + * Generate top `num` recommended items for each user (or recommended users for each item) in the + * input [[DataFrame]]. + * @param dataset [[DataFrame]] containing a column of user or item ids for which to recommend. + * @param srcFactors user / item factors for which to generate top-k recommendations. + * @param dstFactors candidate user / item factors from which to generate top-k recommendations. + * @param srcCol name of column containing id for which to recommend. + * @param num how many recommended items (or users) to compute for each user (or item) + * @return [[DataFrame]] containing `dataset` with a column `predictions` appended + */ + private def recommendUsersOrItems( + dataset: DataFrame, + srcFactors: DataFrame, + dstFactors: DataFrame, + srcCol: String, + num: Int): DataFrame = { + import dataset.sqlContext.implicits._ + val factors = dataset + .join(srcFactors, dataset(srcCol) === srcFactors("id"), "left") + .select(srcFactors("id"), srcFactors("features")) + val topK = ALSModel.recommendForAll(rank, factors, dstFactors, num) + .toDF("id", "predictions") + dataset + .join(topK, dataset(srcCol) === topK("id")) + .select(dataset("*"), topK("predictions")) + } + + /** + * Generate top `num` recommended items for each user id in the input [[DataFrame]]. + * @param dataset input [[DataFrame]]. The user id column is set by [[userCol]]. + * @param num number of items to recommend for each user. + * @return input [[DataFrame]], with a column `predictions` appended. + */ + @Since("2.0.0") + def recommendItems(dataset: DataFrame, num: Int): DataFrame = { + recommendUsersOrItems(dataset, userFactors, itemFactors, $(userCol), num) + } + + /** + * Generate top `num` recommended users for each item in the input [[DataFrame]]. + * @param dataset input [[DataFrame]]. The item id column is set by [[itemCol]]. + * @param num number of users to recommend for each item. + * @return input [[DataFrame]], with a column `predictions` appended. + */ + @Since("2.0.0") + def recommendUsers(dataset: DataFrame, num: Int): DataFrame = { + recommendUsersOrItems(dataset, itemFactors, userFactors, $(itemCol), num) + } + @Since("1.3.0") override def transformSchema(schema: StructType): StructType = { // user and item will be cast to Int @@ -328,6 +410,33 @@ object ALSModel extends MLReadable[ALSModel] { model } } + + /** + * Makes recommendations for all users (or items). + * + * @param rank rank the dimension of the factor vectors + * @param srcFactors src factor to receive recommendations + * @param dstFactors dst factor used to make recommendations + * @param num number of recommendations for each user (or item) + * @return an RDD of (srcId, recommendations) pairs, where recommendations are stored as an array + * of (dstId, score) pairs. + */ + private def recommendForAll( + rank: Int, + srcFactors: DataFrame, + dstFactors: DataFrame, + num: Int): RDD[(Int, Array[(Int, Float)])] = { + import srcFactors.sqlContext.implicits._ + import org.apache.spark.mllib.recommendation.MatrixFactorizationModel + + val srcFactorsRDD = srcFactors.as[(Int, Array[Float])].rdd + val dstFactorsRDD = dstFactors.as[(Int, Array[Float])].rdd + MatrixFactorizationModel.recommendForAll(rank, srcFactorsRDD, dstFactorsRDD, num) + } +} + +object ALSModelUtil { + } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index c642573ccba6..f7dd670ed3e5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -21,6 +21,8 @@ import java.io.IOException import java.lang.{Integer => JavaInteger} import scala.collection.mutable +import scala.math.Numeric.DoubleIsFractional +import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -33,7 +35,6 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD @@ -244,7 +245,6 @@ class MatrixFactorizationModel @Since("0.8.0") ( @Since("1.3.0") object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { - import org.apache.spark.mllib.util.Loader._ /** @@ -261,50 +261,85 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } /** - * Makes recommendations for all users (or products). - * @param rank rank - * @param srcFeatures src features to receive recommendations - * @param dstFeatures dst features used to make recommendations - * @param num number of recommendations for each record - * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array - * of (dstId, rating) pairs. + * Makes recommendations for all users (or items). + * + * @param rank rank the dimension of the factor vectors + * @param srcFactors src factor to receive recommendations + * @param dstFactors dst factor used to make recommendations + * @param num number of recommendations for each user (or item) + * @return an RDD of (srcId, recommendations) pairs, where recommendations are stored as an array + * of (dstId, score) pairs. */ - private def recommendForAll( + private[spark] def recommendForAll[T: ClassTag : Fractional]( rank: Int, - srcFeatures: RDD[(Int, Array[Double])], - dstFeatures: RDD[(Int, Array[Double])], - num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(rank, srcFeatures) - val dstBlocks = blockify(rank, dstFeatures) + srcFactors: RDD[(Int, Array[T])], + dstFactors: RDD[(Int, Array[T])], + num: Int): RDD[(Int, Array[(Int, T)])] = { + val srcBlocks = blockify(rank, srcFactors) + val dstBlocks = blockify(rank, dstFactors) + val tag = implicitly[ClassTag[T]].runtimeClass val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case ((srcIds, srcFactors), (dstIds, dstFactors)) => val m = srcIds.length val n = dstIds.length - val ratings = srcFactors.transpose.multiply(dstFactors) - val output = new Array[(Int, (Int, Double))](m * n) - var k = 0 - ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) - k += 1 + val targetMatrix = new Array[T](m * n) + tag match { + case java.lang.Float.TYPE => + val A = srcFactors.asInstanceOf[Array[Float]] + val B = dstFactors.asInstanceOf[Array[Float]] + val C = targetMatrix.asInstanceOf[Array[Float]] + blas.sgemm("T", "N", m, n, rank, 1f, A, rank, B, rank, 0f, C, m) + case java.lang.Double.TYPE => + val A = srcFactors.asInstanceOf[Array[Double]] + val B = dstFactors.asInstanceOf[Array[Double]] + val C = targetMatrix.asInstanceOf[Array[Double]] + blas.dgemm("T", "N", m, n, rank, 1f, A, rank, B, rank, 0f, C, m) } - output.toSeq + fillPredictions(srcIds, dstIds, targetMatrix, m, n).toSeq } ratings.topByKey(num)(Ordering.by(_._2)) } + /** + * Fills array of (srcId, Array[(targetId, score)] from the result of multiplying the + * user and item factor matrices. + */ + private def fillPredictions[T : ClassTag]( + srcIds: Array[Int], + dstIds: Array[Int], + predictions: Array[T], + m: Int, + n: Int): Array[(Int, (Int, T))] = { + val output = new Array[(Int, (Int, T))](m * n) + // outer loop over columns + var k = 0 + var j = 0 + while (j < n) { + var i = 0 + val indStart = j * m + while (i < m) { + output(k) = (srcIds(i), (dstIds(j), predictions(indStart + i))) + i += 1 + k += 1 + } + j += 1 + } + output + } + /** * Blockifies features to use Level-3 BLAS. */ - private def blockify( + private def blockify[T: ClassTag]( rank: Int, - features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { - val blockSize = 4096 // TODO: tune the block size + features: RDD[(Int, Array[T])], + blockSize: Int = 4096): RDD[(Array[Int], Array[T])] = { val blockStorage = rank * blockSize features.mapPartitions { iter => iter.grouped(blockSize).map { grouped => val ids = mutable.ArrayBuilder.make[Int] ids.sizeHint(blockSize) - val factors = mutable.ArrayBuilder.make[Double] + val factors = mutable.ArrayBuilder.make[T] factors.sizeHint(blockStorage) var i = 0 grouped.foreach { case (id, factor) => @@ -312,7 +347,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { factors ++= factor i += 1 } - (ids.result(), new DenseMatrix(rank, i, factors.result())) + (ids.result(), factors.result()) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index e8ed50acf877..0c8448465201 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -539,6 +539,55 @@ class ALSSuite }.getMessage.contains("was out of Integer range")) } } + + /** + * Validate expected vs actual predictions. + * @param data + */ + private def validateRecommendations(data: Seq[(Seq[(Int, Float)], Seq[(Int, Float)])]) = { + data.foreach { case (expected, actual) => + expected.zip(actual).foreach { case ((id1, score1), (id2, score2)) => + assert(id1 === id2) + assert(score1.toDouble ~== score2.toDouble relTol 1e-14) + } + } + } + + test("recommend top k") { + val sqlContext = this.sqlContext + import sqlContext.implicits._ + val als = new ALS().setRank(2) + val k = 2 + val users = Seq( + (0, Array(6.0f, 4.0f), Array((3, 54.0f), (4, 44.0f))), + (1, Array(3.0f, 4.0f), Array((3, 39.0f), (5, 33.0f))), + (2, Array(3.0f, 6.0f), Array((3, 51.0f), (5, 45.0f))) + ).toDF("user", "features", "expected") + val items = Seq( + (3, Array(5.0f, 6.0f), Array((0, 54.0f), (2, 51.0f))), + (4, Array(6.0f, 2.0f), Array((0, 44.0f), (2, 30.0f))), + (5, Array(3.0f, 6.0f), Array((2, 45.0f), (0, 42.0f))) + ).toDF("item", "features", "expected") + val userFactors = users.select("user", "features").withColumnRenamed("user", "id") + val itemFactors = items.select("item", "features").withColumnRenamed("item", "id") + // construct model and check recommendations + val model = new ALSModel(als.uid, als.getRank, userFactors, itemFactors) + .setUserCol("user") + .setItemCol("item") + // validate user recommendations + val topKItems = model.recommendItems(users, k) + .select("expected", "predictions") + .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() + validateRecommendations(topKItems) + // validate item recommendations + val topKUsers = model.recommendUsers(items, k) + .select("expected", "predictions") + .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() + validateRecommendations(topKUsers) + // check that using a subset of input only generates recommendations for that subset + val filteredTopK = model.recommendItems(users.filter(users("user") > 0), k) + assert(filteredTopK.count == 2) + } } class ALSCleanerSuite extends SparkFunSuite { From 84b42eeff4cdc6fc9ea2581580d2b45dbf7caa78 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 18 Apr 2016 20:57:36 +0200 Subject: [PATCH 2/7] Update params and tests --- .../apache/spark/ml/recommendation/ALS.scala | 12 ++++++++++-- .../spark/ml/recommendation/ALSSuite.scala | 18 +++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 41bae53b8f60..7e77628647f9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -97,8 +97,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo * [[recommendFor]]. * @group param */ - val k = new Param[Int](this, "k", "number of recommendations to make. If set, calling " + - "'transform` will generate the top 'k' recommended items for each user or item, depending on" + + val k = new IntParam(this, "k", "number of recommendations to make. If set, calling " + + "'transform` will generate the top 'k' recommendations for each user or item, depending on" + " the value of 'recommendFor'.", ParamValidators.gt(0)) /** @group getParam */ @@ -541,6 +541,14 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.0.0") def setFinalStorageLevel(value: String): this.type = set(finalStorageLevel, value) + /** @group setParam */ + @Since("2.0.0") + def setK(value: Int): this.type = set(k, value) + + /** @group setParam */ + @Since("2.0.0") + def setRecommendFor(value: String): this.type = set(recommendFor, value) + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 0c8448465201..a4e4cf212c87 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -570,7 +570,7 @@ class ALSSuite ).toDF("item", "features", "expected") val userFactors = users.select("user", "features").withColumnRenamed("user", "id") val itemFactors = items.select("item", "features").withColumnRenamed("item", "id") - // construct model and check recommendations + // construct model val model = new ALSModel(als.uid, als.getRank, userFactors, itemFactors) .setUserCol("user") .setItemCol("item") @@ -585,8 +585,17 @@ class ALSSuite .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() validateRecommendations(topKUsers) // check that using a subset of input only generates recommendations for that subset - val filteredTopK = model.recommendItems(users.filter(users("user") > 0), k) - assert(filteredTopK.count == 2) + assert(model.recommendItems(users.filter(users("user") > 0), k).count == 2) + assert(model.recommendUsers(items.filter(items("item") < 4), k).count == 1) + } + + test("invalid recommend params") { + intercept[IllegalArgumentException] { + val als = new ALS().setK(0) + } + intercept[IllegalArgumentException] { + val als = new ALS().setRecommendFor("foo") + } } } @@ -766,6 +775,9 @@ object ALSSuite extends Logging { "checkpointInterval" -> 20, "intermediateStorageLevel" -> "MEMORY_ONLY", "finalStorageLevel" -> "MEMORY_AND_DISK_SER" + "checkpointInterval" -> 20, + "k" -> 10, + "recommendFor" -> "user" ) // Helper functions to generate test data we share between ALS test suites From a12bede7317dccfe9b1eeb61010586b6b4fff878 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 20 Apr 2016 15:46:55 +0200 Subject: [PATCH 3/7] recommend all using transform --- .../apache/spark/ml/recommendation/ALS.scala | 140 ++++++++++++++-- .../spark/ml/recommendation/ALSSuite.scala | 154 +++++++++++++++--- 2 files changed, 252 insertions(+), 42 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 7e77628647f9..71588e35e920 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -40,7 +40,7 @@ import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg.CholeskyDecomposition import org.apache.spark.mllib.optimization.NNLS import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -115,6 +115,18 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo /** @group getParam */ def getRecommendFor: String = $(recommendFor) + + /** + * Param for whether to return predicted scores with recommendations. + * @group expertParam + */ + val withScores = new BooleanParam(this, "withScores", "whether to return predicted scores" + + " with recommendations.") + + /** @group expertGetParam */ + def getWithScores: Boolean = $(withScores) + + setDefault(withScores -> false) } /** @@ -242,6 +254,13 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w SchemaUtils.checkNumericType(schema, $(itemCol)) // rating will be cast to Float SchemaUtils.checkNumericType(schema, $(ratingCol)) + SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + val ratingType = schema($(ratingCol)).dataType + require(ratingType == FloatType || ratingType == DoubleType) + if (isSet(recommendFor) && !isSet(k)) { + throw new IllegalArgumentException("Parameter 'k' must be set when 'recommendFor' is set.") + } SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) } } @@ -281,9 +300,31 @@ class ALSModel private[ml] ( @Since("2.0.0") def setRecommendFor(value: String): this.type = set(recommendFor, value) + /** @group expertSetParam */ + @Since("2.0.0") + def setWithScores(value: Boolean): this.type = set(withScores, value) + @Since("1.3.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema) + transformSchema(dataset.schema) + if (isRecommendingTopK) { + $(recommendFor) match { + case "user" => recommendItems(dataset, $(k)) + case "item" => recommendUsers(dataset, $(k)) + } + } else { + predictForUserAndItem(dataset) + } + } + + /** + * Generate a predicted score for each (user, item) combination in the input dataset. + * + * @param dataset input dataset containing a user id and item id column. + * @return [[DataFrame]] with predicted score column appended. + */ + private def predictForUserAndItem(dataset: Dataset[_]): DataFrame = { // Register a UDF for DataFrame, and then // create a new column named map(predictionCol) by running the predict UDF. val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) => @@ -313,20 +354,60 @@ class ALSModel private[ml] ( * @return [[DataFrame]] containing `dataset` with a column `predictions` appended */ private def recommendUsersOrItems( - dataset: DataFrame, + dataset: Dataset[_], srcFactors: DataFrame, dstFactors: DataFrame, srcCol: String, + dstCol: String, num: Int): DataFrame = { + import dataset.sqlContext.implicits._ + val schema = dataset.schema + val factors = dataset - .join(srcFactors, dataset(srcCol) === srcFactors("id"), "left") + .select(srcCol) + .distinct + .join(srcFactors, dataset(srcCol) === srcFactors("id"), "inner") .select(srcFactors("id"), srcFactors("features")) - val topK = ALSModel.recommendForAll(rank, factors, dstFactors, num) - .toDF("id", "predictions") - dataset - .join(topK, dataset(srcCol) === topK("id")) - .select(dataset("*"), topK("predictions")) + + val topKRaw = ALSModel.recommendForAll(rank, factors, dstFactors, num) + val topK = if ($(withScores)) { + topKRaw.toDF("id", "predictions") + } else { + topKRaw + .map { case (id, predsWithScores) => (id, predsWithScores.map(_._1)) } + .toDF("id", "predictions") + } + + val result = if (schema.fieldNames.contains(dstCol)) { + // if dstCol exists, we group by that column to generate the 'ground truth' set of + // user (or item) ids. + // Note, this changes the structure of the input DataFrame, returning a row of + // (id, predictions, actual) per unique id in 'srcCol', discarding all other columns. + // In practice this is expected only during model selection with + // CrossValidator or TrainValidationSplit. + val actual = dataset + .select(srcCol, dstCol) + .as[(Int, Int)] + .groupByKey(_._1) + .mapGroups { case (src, ids) => (src, ids.map(_._2).toArray) } + .toDF("id", "actual") + + dataset + .select(srcCol) + .distinct + .join(topK, dataset(srcCol) === topK("id")) + .join(actual, dataset(srcCol) === actual("id")) + .select(dataset(srcCol), topK("predictions"), actual("actual")) + } else { + // if dstCol doesn't exist, we assume we are passed a DataFrame of unique user (or item) ids + // for which to make recommendations, and so we don't use distinct here. This preserves the + // row structure of the input data frame. + dataset + .join(topK, dataset(srcCol) === topK("id")) + .select(dataset("*"), topK("predictions")) + } + result } /** @@ -335,9 +416,8 @@ class ALSModel private[ml] ( * @param num number of items to recommend for each user. * @return input [[DataFrame]], with a column `predictions` appended. */ - @Since("2.0.0") - def recommendItems(dataset: DataFrame, num: Int): DataFrame = { - recommendUsersOrItems(dataset, userFactors, itemFactors, $(userCol), num) + private def recommendItems(dataset: Dataset[_], num: Int): DataFrame = { + recommendUsersOrItems(dataset, userFactors, itemFactors, $(userCol), $(itemCol), num) } /** @@ -346,17 +426,37 @@ class ALSModel private[ml] ( * @param num number of users to recommend for each item. * @return input [[DataFrame]], with a column `predictions` appended. */ - @Since("2.0.0") - def recommendUsers(dataset: DataFrame, num: Int): DataFrame = { - recommendUsersOrItems(dataset, itemFactors, userFactors, $(itemCol), num) + private def recommendUsers(dataset: Dataset[_], num: Int): DataFrame = { + recommendUsersOrItems(dataset, itemFactors, userFactors, $(itemCol), $(userCol), num) } + private def isRecommendingTopK: Boolean = isSet(recommendFor) && isSet(k) + @Since("1.3.0") override def transformSchema(schema: StructType): StructType = { // user and item will be cast to Int SchemaUtils.checkNumericType(schema, $(userCol)) SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) + if (isRecommendingTopK) { + $(recommendFor) match { + case "user" => + SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + if (schema.contains($(itemCol))) { + SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + } + case "item" => + SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + if (schema.contains($(userCol))) { + SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + } + } + SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(IntegerType, false)) + } else { + SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) + } } @Since("1.5.0") @@ -422,10 +522,10 @@ object ALSModel extends MLReadable[ALSModel] { * of (dstId, score) pairs. */ private def recommendForAll( - rank: Int, - srcFactors: DataFrame, - dstFactors: DataFrame, - num: Int): RDD[(Int, Array[(Int, Float)])] = { + rank: Int, + srcFactors: DataFrame, + dstFactors: DataFrame, + num: Int): RDD[(Int, Array[(Int, Float)])] = { import srcFactors.sqlContext.implicits._ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel @@ -549,6 +649,10 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.0.0") def setRecommendFor(value: String): this.type = set(recommendFor, value) + /** @group expertSetParam */ + @Since("2.0.0") + def setWithScores(value: Boolean): this.type = set(withScores, value) + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index a4e4cf212c87..17fa90426698 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -541,7 +541,18 @@ class ALSSuite } /** - * Validate expected vs actual predictions. + * Validate expected vs actual ids. + * @param data + */ + private def validateRecommendedIds(data: Seq[(Seq[Int], Seq[Int])]) = { + data.foreach { case (expected, actual) => + assert(expected === actual) + + } + } + + /** + * Validate expected vs actual ids and scores * @param data */ private def validateRecommendations(data: Seq[(Seq[(Int, Float)], Seq[(Int, Float)])]) = { @@ -553,43 +564,137 @@ class ALSSuite } } - test("recommend top k") { + private def getALSModel = { val sqlContext = this.sqlContext import sqlContext.implicits._ + val userFactors = Seq( + (0, Array(6.0f, 4.0f)), + (1, Array(3.0f, 4.0f)), + (2, Array(3.0f, 6.0f)) + ).toDF("id", "features") + val itemFactors = Seq( + (3, Array(5.0f, 6.0f)), + (4, Array(6.0f, 2.0f)), + (5, Array(3.0f, 6.0f)) + ).toDF("id", "features") val als = new ALS().setRank(2) - val k = 2 + new ALSModel(als.uid, als.getRank, userFactors, itemFactors) + } + + test("recommend top k with no 'ground truth' column") { + val sqlContext = this.sqlContext + import sqlContext.implicits._ + val users = Seq( - (0, Array(6.0f, 4.0f), Array((3, 54.0f), (4, 44.0f))), - (1, Array(3.0f, 4.0f), Array((3, 39.0f), (5, 33.0f))), - (2, Array(3.0f, 6.0f), Array((3, 51.0f), (5, 45.0f))) - ).toDF("user", "features", "expected") + (0, Array((3, 54.0f), (4, 44.0f)), Array(3, 4)), + (1, Array((3, 39.0f), (5, 33.0f)), Array(3, 5)), + (2, Array((3, 51.0f), (5, 45.0f)), Array(3, 5)) + ).toDF("user", "expected_with_scores", "expected") val items = Seq( - (3, Array(5.0f, 6.0f), Array((0, 54.0f), (2, 51.0f))), - (4, Array(6.0f, 2.0f), Array((0, 44.0f), (2, 30.0f))), - (5, Array(3.0f, 6.0f), Array((2, 45.0f), (0, 42.0f))) - ).toDF("item", "features", "expected") - val userFactors = users.select("user", "features").withColumnRenamed("user", "id") - val itemFactors = items.select("item", "features").withColumnRenamed("item", "id") + (3, Array((0, 54.0f), (2, 51.0f)), Array(0, 2)), + (4, Array((0, 44.0f), (2, 30.0f)), Array(0, 2)), + (5, Array((2, 45.0f), (0, 42.0f)), Array(2, 0)) + ).toDF("item", "expected_with_scores", "expected") + // construct model - val model = new ALSModel(als.uid, als.getRank, userFactors, itemFactors) + val model = getALSModel .setUserCol("user") .setItemCol("item") - // validate user recommendations - val topKItems = model.recommendItems(users, k) - .select("expected", "predictions") + .setK(2) + + // validate recommendations for users, with scores + val topKItemWithScores = model.setWithScores(true).setRecommendFor("user").transform(users) + .select("expected_with_scores", "predictions") .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() - validateRecommendations(topKItems) - // validate item recommendations - val topKUsers = model.recommendUsers(items, k) + validateRecommendations(topKItemWithScores) + // without scores + val topKItems = model.setWithScores(false).setRecommendFor("user").transform(users) .select("expected", "predictions") + .as[(Seq[Int], Seq[Int])].rdd.collect() + validateRecommendedIds(topKItems) + + // validate item recommendations, with scores + val topKUsersWithScores = model.setWithScores(true).setRecommendFor("item").transform(items) + .select("expected_with_scores", "predictions") .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() - validateRecommendations(topKUsers) + validateRecommendations(topKUsersWithScores) + // without scores + val topKUsers = model.setWithScores(false).setRecommendFor("item").transform(items) + .select("expected", "predictions") + .as[(Seq[Int], Seq[Int])].rdd.collect() + validateRecommendedIds(topKUsers) + // check that using a subset of input only generates recommendations for that subset - assert(model.recommendItems(users.filter(users("user") > 0), k).count == 2) - assert(model.recommendUsers(items.filter(items("item") < 4), k).count == 1) + assert(model.setRecommendFor("user").transform(users.filter(users("user") > 0)).count == 2) + assert(model.setRecommendFor("item").transform(items.filter(items("item") < 4)).count == 1) + } + + test("recommend top k with 'ground truth' column") { + val sqlContext = this.sqlContext + import sqlContext.implicits._ + + // raw input in the same format as for ALS.fit() + val ratings = Seq( + (0, 3, 54f), + (0, 4, 44f), + (0, 5, 42f), + (1, 3, 39f), + (1, 5, 33f), + (1, 4, 26f), + (2, 3, 51f), + (2, 5, 45f), + (2, 4, 30f) + ).toDF("user", "item", "rating") + val expected = Map( + 0 -> Seq(3, 4, 5), + 1 -> Seq(3, 5, 4), + 2 -> Seq(3, 5, 4) + ) + + // construct model + val model = getALSModel + .setUserCol("user") + .setItemCol("item") + .setK(2) + + model.setRecommendFor("user").transform(ratings) + .as[(Int, Seq[Int], Seq[Int])] + .rdd + .collect + .foreach { case (id, pred, actual) => + assert(expected(id) === actual) + assert(expected(id).take(2) === pred) + } + } + + test("ALS fit -> recommend top k") { + val sqlContext = this.sqlContext + import sqlContext.implicits._ + val (ratings: RDD[Rating[Int]], _) = genExplicitTestData(numUsers = 5, numItems = 4, rank = 1) + val dataset = ratings.toDF + val als = new ALS() + .setK(2) + .setRecommendFor("user") + + val userId = ratings.first().user + val numItems = ratings.filter(_.user == userId).map(_.item).distinct().count() + val model = als.fit(dataset) + val result = model.transform(dataset) + assert(result.count == 5) + val userRow = result.select("predictions", "actual").where(result("user") === userId).first() + assert(userRow.getSeq[Int](0).length == 2) + assert(userRow.getSeq[Int](1).length == numItems) + } test("invalid recommend params") { + // TODO + // val sqlContext = this.sqlContext + // import sqlContext.implicits._ + // intercept[IllegalArgumentException] { + // val (ratings, _) = genExplicitTestData(numUsers = 2, numItems = 2, rank = 1) + // val als = new ALS().setRecommendFor("user").fit(ratings.toDF) + // } intercept[IllegalArgumentException] { val als = new ALS().setK(0) } @@ -777,7 +882,8 @@ object ALSSuite extends Logging { "finalStorageLevel" -> "MEMORY_AND_DISK_SER" "checkpointInterval" -> 20, "k" -> 10, - "recommendFor" -> "user" + "recommendFor" -> "user", + "withScores" -> true ) // Helper functions to generate test data we share between ALS test suites From 409c8a2a227f11eb6c9f17d9f6084c78a4d618a5 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 21 Apr 2016 08:54:32 +0200 Subject: [PATCH 4/7] remove space --- .../test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 17fa90426698..b1e9ab4b2ae6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -547,7 +547,6 @@ class ALSSuite private def validateRecommendedIds(data: Seq[(Seq[Int], Seq[Int])]) = { data.foreach { case (expected, actual) => assert(expected === actual) - } } From 71ae89db2a5a0feec3d1b1c5544a2ebeccaed4d7 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 21 Apr 2016 11:13:33 +0200 Subject: [PATCH 5/7] clean up and docs --- .../apache/spark/ml/recommendation/ALS.scala | 77 +++++++++++++------ .../MatrixFactorizationModel.scala | 2 +- .../spark/ml/recommendation/ALSSuite.scala | 26 ++++--- 3 files changed, 68 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 71588e35e920..6434a6e3f9aa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -51,7 +51,7 @@ import org.apache.spark.util.random.XORShiftRandom /** * Common params for ALS and ALSModel. */ -private[recommendation] trait ALSModelParams extends Params with HasPredictionCol { +private[recommendation] trait ALSModelParams extends Params with HasPredictionCol with HasLabelCol { /** * Param for the column name for user ids. Ids must be integers. Other * numeric types are supported for this column, but will be cast to integers as long as they @@ -258,8 +258,8 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) val ratingType = schema($(ratingCol)).dataType require(ratingType == FloatType || ratingType == DoubleType) - if (isSet(recommendFor) && !isSet(k)) { - throw new IllegalArgumentException("Parameter 'k' must be set when 'recommendFor' is set.") + if (isSet(recommendFor)) { + require(isSet(k), "Parameter 'k' must be set when 'recommendFor' is set.") } SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) } @@ -292,6 +292,10 @@ class ALSModel private[ml] ( @Since("1.3.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group setParam */ + @Since("2.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + /** @group setParam */ @Since("2.0.0") def setK(value: Int): this.type = set(k, value) @@ -346,20 +350,26 @@ class ALSModel private[ml] ( /** * Generate top `num` recommended items for each user (or recommended users for each item) in the * input [[DataFrame]]. + * If `dstCol` exists in the input schema, then the result will contain the recommendations and + * the actual "ground truth" ids (taken from those ids present in `dstCol`). If `dstCol` doesn't + * exist, then the result will only contain recommendations. * @param dataset [[DataFrame]] containing a column of user or item ids for which to recommend. * @param srcFactors user / item factors for which to generate top-k recommendations. * @param dstFactors candidate user / item factors from which to generate top-k recommendations. - * @param srcCol name of column containing id for which to recommend. + * @param srcCol name of column containing ids for which to recommend. + * @param dstCol name of column containing ids used to generate the "ground truth" set (if the + * column exists). * @param num how many recommended items (or users) to compute for each user (or item) - * @return [[DataFrame]] containing `dataset` with a column `predictions` appended + * @return [[DataFrame]] containing recommendations (and "ground truth" set if `dstCol` exists) + * for each id in `srcCol`. */ private def recommendUsersOrItems( - dataset: Dataset[_], - srcFactors: DataFrame, - dstFactors: DataFrame, - srcCol: String, - dstCol: String, - num: Int): DataFrame = { + dataset: Dataset[_], + srcFactors: DataFrame, + dstFactors: DataFrame, + srcCol: String, + dstCol: String, + num: Int): DataFrame = { import dataset.sqlContext.implicits._ val schema = dataset.schema @@ -367,25 +377,27 @@ class ALSModel private[ml] ( val factors = dataset .select(srcCol) .distinct - .join(srcFactors, dataset(srcCol) === srcFactors("id"), "inner") + .join(srcFactors, dataset(srcCol) === srcFactors("id"), "left") .select(srcFactors("id"), srcFactors("features")) val topKRaw = ALSModel.recommendForAll(rank, factors, dstFactors, num) val topK = if ($(withScores)) { - topKRaw.toDF("id", "predictions") + // with scores, 'predictions' is an Array((id1, score1), (id2, score2), ...) + topKRaw.toDF("id", "recommendations") } else { + // without scores, 'predictions' is an Array(id1, id2, ...) topKRaw .map { case (id, predsWithScores) => (id, predsWithScores.map(_._1)) } - .toDF("id", "predictions") + .toDF("id", "recommendations") } val result = if (schema.fieldNames.contains(dstCol)) { - // if dstCol exists, we group by that column to generate the 'ground truth' set of + // if 'dstCol' exists, we group by that column to generate the 'ground truth' set of // user (or item) ids. // Note, this changes the structure of the input DataFrame, returning a row of // (id, predictions, actual) per unique id in 'srcCol', discarding all other columns. // In practice this is expected only during model selection with - // CrossValidator or TrainValidationSplit. + // CrossValidator/TrainValidationSplit using RankingEvaluator. val actual = dataset .select(srcCol, dstCol) .as[(Int, Int)] @@ -398,14 +410,19 @@ class ALSModel private[ml] ( .distinct .join(topK, dataset(srcCol) === topK("id")) .join(actual, dataset(srcCol) === actual("id")) - .select(dataset(srcCol), topK("predictions"), actual("actual")) + .select( + dataset(srcCol), + topK("recommendations").as($(predictionCol)), + actual("actual").as($(labelCol)) + ) } else { - // if dstCol doesn't exist, we assume we are passed a DataFrame of unique user (or item) ids + // if 'dstCol' doesn't exist, we assume we are passed a DataFrame of unique user (or item) ids // for which to make recommendations, and so we don't use distinct here. This preserves the - // row structure of the input data frame. + // schema and structure of the input DataFrame (but doesn't handle duplicate input ids so will + // generate recommendations multiple times in this case). dataset - .join(topK, dataset(srcCol) === topK("id")) - .select(dataset("*"), topK("predictions")) + .join(topK, dataset(srcCol) === topK("id"), "left") + .select(dataset("*"), topK("recommendations").as($(predictionCol))) } result } @@ -414,7 +431,7 @@ class ALSModel private[ml] ( * Generate top `num` recommended items for each user id in the input [[DataFrame]]. * @param dataset input [[DataFrame]]. The user id column is set by [[userCol]]. * @param num number of items to recommend for each user. - * @return input [[DataFrame]], with a column `predictions` appended. + * @return [[DataFrame]] containing recommendations. */ private def recommendItems(dataset: Dataset[_], num: Int): DataFrame = { recommendUsersOrItems(dataset, userFactors, itemFactors, $(userCol), $(itemCol), num) @@ -424,7 +441,7 @@ class ALSModel private[ml] ( * Generate top `num` recommended users for each item in the input [[DataFrame]]. * @param dataset input [[DataFrame]]. The item id column is set by [[itemCol]]. * @param num number of users to recommend for each item. - * @return input [[DataFrame]], with a column `predictions` appended. + * @return [[DataFrame]] containing recommendations. */ private def recommendUsers(dataset: Dataset[_], num: Int): DataFrame = { recommendUsersOrItems(dataset, itemFactors, userFactors, $(itemCol), $(userCol), num) @@ -439,19 +456,25 @@ class ALSModel private[ml] ( SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) if (isRecommendingTopK) { - $(recommendFor) match { + val labelSchema = $(recommendFor) match { case "user" => SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) if (schema.contains($(itemCol))) { SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false)) + } else { + schema } case "item" => SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) if (schema.contains($(userCol))) { SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false)) + } else { + schema } } - SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(IntegerType, false)) + SchemaUtils.appendColumn(labelSchema, $(predictionCol), ArrayType(IntegerType, false)) } else { SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) @@ -641,6 +664,10 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.0.0") def setFinalStorageLevel(value: String): this.type = set(finalStorageLevel, value) + /** @group setParam */ + @Since("2.0.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + /** @group setParam */ @Since("2.0.0") def setK(value: Int): this.type = set(k, value) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index f7dd670ed3e5..4e925e4c014b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -333,7 +333,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { private def blockify[T: ClassTag]( rank: Int, features: RDD[(Int, Array[T])], - blockSize: Int = 4096): RDD[(Array[Int], Array[T])] = { + /* TODO make blockSize a param */blockSize: Int = 4096): RDD[(Array[Int], Array[T])] = { val blockStorage = rank * blockSize features.mapPartitions { iter => iter.grouped(blockSize).map { grouped => diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index b1e9ab4b2ae6..db833146bdce 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -542,6 +542,7 @@ class ALSSuite /** * Validate expected vs actual ids. + * * @param data */ private def validateRecommendedIds(data: Seq[(Seq[Int], Seq[Int])]) = { @@ -552,6 +553,7 @@ class ALSSuite /** * Validate expected vs actual ids and scores + * * @param data */ private def validateRecommendations(data: Seq[(Seq[(Int, Float)], Seq[(Int, Float)])]) = { @@ -603,23 +605,23 @@ class ALSSuite // validate recommendations for users, with scores val topKItemWithScores = model.setWithScores(true).setRecommendFor("user").transform(users) - .select("expected_with_scores", "predictions") + .select("expected_with_scores", "prediction") .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() validateRecommendations(topKItemWithScores) // without scores val topKItems = model.setWithScores(false).setRecommendFor("user").transform(users) - .select("expected", "predictions") + .select("expected", "prediction") .as[(Seq[Int], Seq[Int])].rdd.collect() validateRecommendedIds(topKItems) // validate item recommendations, with scores val topKUsersWithScores = model.setWithScores(true).setRecommendFor("item").transform(items) - .select("expected_with_scores", "predictions") + .select("expected_with_scores", "prediction") .as[(Seq[(Int, Float)], Seq[(Int, Float)])].rdd.collect() validateRecommendations(topKUsersWithScores) // without scores val topKUsers = model.setWithScores(false).setRecommendFor("item").transform(items) - .select("expected", "predictions") + .select("expected", "prediction") .as[(Seq[Int], Seq[Int])].rdd.collect() validateRecommendedIds(topKUsers) @@ -680,7 +682,7 @@ class ALSSuite val model = als.fit(dataset) val result = model.transform(dataset) assert(result.count == 5) - val userRow = result.select("predictions", "actual").where(result("user") === userId).first() + val userRow = result.select("prediction", "label").where(result("user") === userId).first() assert(userRow.getSeq[Int](0).length == 2) assert(userRow.getSeq[Int](1).length == numItems) @@ -688,12 +690,14 @@ class ALSSuite test("invalid recommend params") { // TODO - // val sqlContext = this.sqlContext - // import sqlContext.implicits._ - // intercept[IllegalArgumentException] { - // val (ratings, _) = genExplicitTestData(numUsers = 2, numItems = 2, rank = 1) - // val als = new ALS().setRecommendFor("user").fit(ratings.toDF) - // } + /* + val sqlContext = this.sqlContext + import sqlContext.implicits._ + intercept[IllegalArgumentException] { + val (ratings, _) = genExplicitTestData(numUsers = 2, numItems = 2, rank = 1) + val als = new ALS().setRecommendFor("user").fit(ratings.toDF) + } + */ intercept[IllegalArgumentException] { val als = new ALS().setK(0) } From 675f3fa12e27d2fd13429e901e596f83cff6c28c Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 25 Apr 2016 13:59:06 +0200 Subject: [PATCH 6/7] comment out transformSchema for now (see SPARK-14891) --- .../src/main/scala/org/apache/spark/ml/recommendation/ALS.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 6434a6e3f9aa..ec38e633e943 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -310,7 +310,6 @@ class ALSModel private[ml] ( @Since("1.3.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) transformSchema(dataset.schema) if (isRecommendingTopK) { $(recommendFor) match { From af432cc512517bfb27b4603a8b27d55dd4f8bea1 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Fri, 29 Jul 2016 15:03:56 +0200 Subject: [PATCH 7/7] clean up from rebase --- .../apache/spark/ml/recommendation/ALS.scala | 18 ++++-------- .../spark/ml/recommendation/ALSSuite.scala | 29 ++++++++----------- 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index ec38e633e943..9aa1135652fd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -40,7 +40,7 @@ import org.apache.spark.ml.util._ import org.apache.spark.mllib.linalg.CholeskyDecomposition import org.apache.spark.mllib.optimization.NNLS import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -254,10 +254,6 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w SchemaUtils.checkNumericType(schema, $(itemCol)) // rating will be cast to Float SchemaUtils.checkNumericType(schema, $(ratingCol)) - SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) - SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) - val ratingType = schema($(ratingCol)).dataType - require(ratingType == FloatType || ratingType == DoubleType) if (isSet(recommendFor)) { require(isSet(k), "Parameter 'k' must be set when 'recommendFor' is set.") } @@ -450,14 +446,10 @@ class ALSModel private[ml] ( @Since("1.3.0") override def transformSchema(schema: StructType): StructType = { - // user and item will be cast to Int - SchemaUtils.checkNumericType(schema, $(userCol)) - SchemaUtils.checkNumericType(schema, $(itemCol)) - SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) if (isRecommendingTopK) { val labelSchema = $(recommendFor) match { case "user" => - SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) + SchemaUtils.checkNumericType(schema, $(userCol)) if (schema.contains($(itemCol))) { SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false)) @@ -465,7 +457,7 @@ class ALSModel private[ml] ( schema } case "item" => - SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + SchemaUtils.checkNumericType(schema, $(itemCol)) if (schema.contains($(userCol))) { SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false)) @@ -475,8 +467,8 @@ class ALSModel private[ml] ( } SchemaUtils.appendColumn(labelSchema, $(predictionCol), ArrayType(IntegerType, false)) } else { - SchemaUtils.checkColumnType(schema, $(userCol), IntegerType) - SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType) + SchemaUtils.checkNumericType(schema, $(userCol)) + SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.appendColumn(schema, $(predictionCol), FloatType) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index db833146bdce..aaece9994bc4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -566,8 +566,8 @@ class ALSSuite } private def getALSModel = { - val sqlContext = this.sqlContext - import sqlContext.implicits._ + val spark = this.spark + import spark.implicits._ val userFactors = Seq( (0, Array(6.0f, 4.0f)), (1, Array(3.0f, 4.0f)), @@ -583,8 +583,8 @@ class ALSSuite } test("recommend top k with no 'ground truth' column") { - val sqlContext = this.sqlContext - import sqlContext.implicits._ + val spark = this.spark + import spark.implicits._ val users = Seq( (0, Array((3, 54.0f), (4, 44.0f)), Array(3, 4)), @@ -631,8 +631,8 @@ class ALSSuite } test("recommend top k with 'ground truth' column") { - val sqlContext = this.sqlContext - import sqlContext.implicits._ + val spark = this.spark + import spark.implicits._ // raw input in the same format as for ALS.fit() val ratings = Seq( @@ -669,13 +669,11 @@ class ALSSuite } test("ALS fit -> recommend top k") { - val sqlContext = this.sqlContext - import sqlContext.implicits._ + val spark = this.spark + import spark.implicits._ val (ratings: RDD[Rating[Int]], _) = genExplicitTestData(numUsers = 5, numItems = 4, rank = 1) val dataset = ratings.toDF - val als = new ALS() - .setK(2) - .setRecommendFor("user") + val als = new ALS().setK(2).setRecommendFor("user") val userId = ratings.first().user val numItems = ratings.filter(_.user == userId).map(_.item).distinct().count() @@ -689,15 +687,12 @@ class ALSSuite } test("invalid recommend params") { - // TODO - /* - val sqlContext = this.sqlContext - import sqlContext.implicits._ + val spark = this.spark + import spark.implicits._ intercept[IllegalArgumentException] { val (ratings, _) = genExplicitTestData(numUsers = 2, numItems = 2, rank = 1) val als = new ALS().setRecommendFor("user").fit(ratings.toDF) } - */ intercept[IllegalArgumentException] { val als = new ALS().setK(0) } @@ -882,7 +877,7 @@ object ALSSuite extends Logging { "nonnegative" -> true, "checkpointInterval" -> 20, "intermediateStorageLevel" -> "MEMORY_ONLY", - "finalStorageLevel" -> "MEMORY_AND_DISK_SER" + "finalStorageLevel" -> "MEMORY_AND_DISK_SER", "checkpointInterval" -> 20, "k" -> 10, "recommendFor" -> "user",