Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 244 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.util.random.XORShiftRandom
/**
* Common params for ALS and ALSModel.
*/
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol with HasLabelCol {
/**
* Param for the column name for user ids. Ids must be integers. Other
* numeric types are supported for this column, but will be cast to integers as long as they
Expand Down Expand Up @@ -90,6 +90,43 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
n.toInt
}
}

/**
* Param for number of recommendations to make. If set, calling [[ALSModel.transform()]] will
* generate the top `k` recommendations for each user or item, depending on the value of
* [[recommendFor]].
* @group param
*/
val k = new IntParam(this, "k", "number of recommendations to make. If set, calling " +
"'transform` will generate the top 'k' recommendations for each user or item, depending on" +
" the value of 'recommendFor'.", ParamValidators.gt(0))

/** @group getParam */
def getK: Int = $(k)

/**
* Param for whether to generate recommendations for users or items.
* Supported values: `user`, `item`. If set, [[k]] must also be set.
* @group param
*/
val recommendFor = new Param[String](this, "recommendFor", "whether to generate " +
"recommendations for users or items. Supported values: 'user', 'item'. " +
"If set, 'k' must also be set.", ParamValidators.inArray(Array("user", "item")))

/** @group getParam */
def getRecommendFor: String = $(recommendFor)

/**
* Param for whether to return predicted scores with recommendations.
* @group expertParam
*/
val withScores = new BooleanParam(this, "withScores", "whether to return predicted scores" +
" with recommendations.")

/** @group expertGetParam */
def getWithScores: Boolean = $(withScores)

setDefault(withScores -> false)
}

/**
Expand Down Expand Up @@ -217,6 +254,9 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w
SchemaUtils.checkNumericType(schema, $(itemCol))
// rating will be cast to Float
SchemaUtils.checkNumericType(schema, $(ratingCol))
if (isSet(recommendFor)) {
require(isSet(k), "Parameter 'k' must be set when 'recommendFor' is set.")
}
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType)
}
}
Expand Down Expand Up @@ -248,9 +288,42 @@ class ALSModel private[ml] (
@Since("1.3.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

/** @group setParam */
@Since("2.0.0")
def setLabelCol(value: String): this.type = set(labelCol, value)

/** @group setParam */
@Since("2.0.0")
def setK(value: Int): this.type = set(k, value)

/** @group setParam */
@Since("2.0.0")
def setRecommendFor(value: String): this.type = set(recommendFor, value)

/** @group expertSetParam */
@Since("2.0.0")
def setWithScores(value: Boolean): this.type = set(withScores, value)

@Since("1.3.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema)
if (isRecommendingTopK) {
$(recommendFor) match {
case "user" => recommendItems(dataset, $(k))
case "item" => recommendUsers(dataset, $(k))
}
} else {
predictForUserAndItem(dataset)
}
}

/**
* Generate a predicted score for each (user, item) combination in the input dataset.
*
* @param dataset input dataset containing a user id and item id column.
* @return [[DataFrame]] with predicted score column appended.
*/
private def predictForUserAndItem(dataset: Dataset[_]): DataFrame = {
// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) =>
Expand All @@ -269,12 +342,135 @@ class ALSModel private[ml] (
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
}

/**
* Generate top `num` recommended items for each user (or recommended users for each item) in the
* input [[DataFrame]].
* If `dstCol` exists in the input schema, then the result will contain the recommendations and
* the actual "ground truth" ids (taken from those ids present in `dstCol`). If `dstCol` doesn't
* exist, then the result will only contain recommendations.
* @param dataset [[DataFrame]] containing a column of user or item ids for which to recommend.
* @param srcFactors user / item factors for which to generate top-k recommendations.
* @param dstFactors candidate user / item factors from which to generate top-k recommendations.
* @param srcCol name of column containing ids for which to recommend.
* @param dstCol name of column containing ids used to generate the "ground truth" set (if the
* column exists).
* @param num how many recommended items (or users) to compute for each user (or item)
* @return [[DataFrame]] containing recommendations (and "ground truth" set if `dstCol` exists)
* for each id in `srcCol`.
*/
private def recommendUsersOrItems(
dataset: Dataset[_],
srcFactors: DataFrame,
dstFactors: DataFrame,
srcCol: String,
dstCol: String,
num: Int): DataFrame = {

import dataset.sqlContext.implicits._
val schema = dataset.schema

val factors = dataset
.select(srcCol)
.distinct
.join(srcFactors, dataset(srcCol) === srcFactors("id"), "left")
.select(srcFactors("id"), srcFactors("features"))

val topKRaw = ALSModel.recommendForAll(rank, factors, dstFactors, num)
val topK = if ($(withScores)) {
// with scores, 'predictions' is an Array((id1, score1), (id2, score2), ...)
topKRaw.toDF("id", "recommendations")
} else {
// without scores, 'predictions' is an Array(id1, id2, ...)
topKRaw
.map { case (id, predsWithScores) => (id, predsWithScores.map(_._1)) }
.toDF("id", "recommendations")
}

val result = if (schema.fieldNames.contains(dstCol)) {
// if 'dstCol' exists, we group by that column to generate the 'ground truth' set of
// user (or item) ids.
// Note, this changes the structure of the input DataFrame, returning a row of
// (id, predictions, actual) per unique id in 'srcCol', discarding all other columns.
// In practice this is expected only during model selection with
// CrossValidator/TrainValidationSplit using RankingEvaluator.
val actual = dataset
.select(srcCol, dstCol)
.as[(Int, Int)]
.groupByKey(_._1)
.mapGroups { case (src, ids) => (src, ids.map(_._2).toArray) }
.toDF("id", "actual")

dataset
.select(srcCol)
.distinct
.join(topK, dataset(srcCol) === topK("id"))
.join(actual, dataset(srcCol) === actual("id"))
.select(
Copy link
Contributor

@frreiss frreiss Apr 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two joins and distinct should probably be a cogroup.mapGroups, since you know that id is a key and dataset is unlikely to contain many duplicates. As far as I can see, the current Catalyst rules will generate a conservative plan with three stages for the expression distinct.join.join, since there's no information available on key constraints.

EDIT: On second thought, there's currently no cogroup operator in the DataFrame API, so I guess the way this expression is currently written is the best one can do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a note to look into optimizing this when cogroup exists.

By the way, when you say that dataset is unlikely to contain many duplicates, are you referring to duplicate rows? If so, yes that is true as generally each user, item, rating tuple is distinct (though it's not guaranteed to be the case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant duplicate values of srcCol that are filtered out by the first distinct. What I was trying to say was that, if a cogroup was used here, every group would fit handily in memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Yes, that should generally be true (though in larger-scale social network style domains, one could run into "super-node" problems with highly popular users or items).

dataset(srcCol),
topK("recommendations").as($(predictionCol)),
actual("actual").as($(labelCol))
)
} else {
// if 'dstCol' doesn't exist, we assume we are passed a DataFrame of unique user (or item) ids
// for which to make recommendations, and so we don't use distinct here. This preserves the
// schema and structure of the input DataFrame (but doesn't handle duplicate input ids so will
// generate recommendations multiple times in this case).
dataset
.join(topK, dataset(srcCol) === topK("id"), "left")
.select(dataset("*"), topK("recommendations").as($(predictionCol)))
}
result
}

/**
* Generate top `num` recommended items for each user id in the input [[DataFrame]].
* @param dataset input [[DataFrame]]. The user id column is set by [[userCol]].
* @param num number of items to recommend for each user.
* @return [[DataFrame]] containing recommendations.
*/
private def recommendItems(dataset: Dataset[_], num: Int): DataFrame = {
recommendUsersOrItems(dataset, userFactors, itemFactors, $(userCol), $(itemCol), num)
}

/**
* Generate top `num` recommended users for each item in the input [[DataFrame]].
* @param dataset input [[DataFrame]]. The item id column is set by [[itemCol]].
* @param num number of users to recommend for each item.
* @return [[DataFrame]] containing recommendations.
*/
private def recommendUsers(dataset: Dataset[_], num: Int): DataFrame = {
recommendUsersOrItems(dataset, itemFactors, userFactors, $(itemCol), $(userCol), num)
}

private def isRecommendingTopK: Boolean = isSet(recommendFor) && isSet(k)

@Since("1.3.0")
override def transformSchema(schema: StructType): StructType = {
// user and item will be cast to Int
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType)
if (isRecommendingTopK) {
val labelSchema = $(recommendFor) match {
case "user" =>
SchemaUtils.checkNumericType(schema, $(userCol))
if (schema.contains($(itemCol))) {
SchemaUtils.checkColumnType(schema, $(itemCol), IntegerType)
SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false))
} else {
schema
}
case "item" =>
SchemaUtils.checkNumericType(schema, $(itemCol))
if (schema.contains($(userCol))) {
SchemaUtils.checkColumnType(schema, $(userCol), IntegerType)
SchemaUtils.appendColumn(schema, $(labelCol), ArrayType(IntegerType, false))
} else {
schema
}
}
SchemaUtils.appendColumn(labelSchema, $(predictionCol), ArrayType(IntegerType, false))
} else {
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.appendColumn(schema, $(predictionCol), FloatType)
}
}

@Since("1.5.0")
Expand Down Expand Up @@ -328,6 +524,33 @@ object ALSModel extends MLReadable[ALSModel] {
model
}
}

/**
* Makes recommendations for all users (or items).
*
* @param rank rank the dimension of the factor vectors
* @param srcFactors src factor to receive recommendations
* @param dstFactors dst factor used to make recommendations
* @param num number of recommendations for each user (or item)
* @return an RDD of (srcId, recommendations) pairs, where recommendations are stored as an array
* of (dstId, score) pairs.
*/
private def recommendForAll(
rank: Int,
srcFactors: DataFrame,
dstFactors: DataFrame,
num: Int): RDD[(Int, Array[(Int, Float)])] = {
import srcFactors.sqlContext.implicits._
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

val srcFactorsRDD = srcFactors.as[(Int, Array[Float])].rdd
val dstFactorsRDD = dstFactors.as[(Int, Array[Float])].rdd
MatrixFactorizationModel.recommendForAll(rank, srcFactorsRDD, dstFactorsRDD, num)
}
}

object ALSModelUtil {

}

/**
Expand Down Expand Up @@ -432,6 +655,22 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
@Since("2.0.0")
def setFinalStorageLevel(value: String): this.type = set(finalStorageLevel, value)

/** @group setParam */
@Since("2.0.0")
def setLabelCol(value: String): this.type = set(labelCol, value)

/** @group setParam */
@Since("2.0.0")
def setK(value: Int): this.type = set(k, value)

/** @group setParam */
@Since("2.0.0")
def setRecommendFor(value: String): this.type = set(recommendFor, value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to set recommendFor and k together? like this

def setRecommendFor(forval: String, kval: Int)

That way you would be guaranteed they are both set if using recommendFor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. I like the idea, though no other estimator/model uses this approach (but AFAIK there's no other estimator where 2 specific non-default params must be set together). @jkbradley @holdenk thoughts?


/** @group expertSetParam */
@Since("2.0.0")
def setWithScores(value: Boolean): this.type = set(withScores, value)

/**
* Sets both numUserBlocks and numItemBlocks to the specific value.
*
Expand Down
Loading