Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, col, collect_list, row_number, udf}
import org.apache.spark.sql.types.LongType

/**
* Evaluator for ranking.
*/
@Since("2.2.0")
@Experimental
final class RankingEvaluator @Since("2.2.0")(@Since("2.2.0") override val uid: String)
extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {

@Since("2.2.0")
def this() = this(Identifiable.randomUID("rankingEval"))

@Since("2.2.0")
val k = new IntParam(this, "k", "Top-K cutoff", (x: Int) => x > 0)

/** @group getParam */
@Since("2.2.0")
def getK: Int = $(k)

/** @group setParam */
@Since("2.2.0")
def setK(value: Int): this.type = set(k, value)

setDefault(k -> 1)

@Since("2.2.0")
val metricName: Param[String] = {
val allowedParams = ParamValidators.inArray(Array("mpr"))
new Param(this, "metricName", "metric name in evaluation (mpr)", allowedParams)
}

/** @group getParam */
@Since("2.2.0")
def getMetricName: String = $(metricName)

/** @group setParam */
@Since("2.2.0")
def setMetricName(value: String): this.type = set(metricName, value)

/** @group setParam */
@Since("2.2.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

/** @group setParam */
@Since("2.2.0")
def setLabelCol(value: String): this.type = set(labelCol, value)

/**
* Param for query column name.
* @group param
*/
val queryCol: Param[String] = new Param[String](this, "queryCol", "query column name")

setDefault(queryCol, "query")

/** @group getParam */
@Since("2.2.0")
def getQueryCol: String = $(queryCol)

/** @group setParam */
@Since("2.2.0")
def setQueryCol(value: String): this.type = set(queryCol, value)

setDefault(metricName -> "mpr")

@Since("2.2.0")
override def evaluate(dataset: Dataset[_]): Double = {
val schema = dataset.schema
SchemaUtils.checkNumericType(schema, $(predictionCol))
SchemaUtils.checkNumericType(schema, $(labelCol))
SchemaUtils.checkNumericType(schema, $(queryCol))

val w = Window.partitionBy(col($(queryCol))).orderBy(col($(predictionCol)).desc)

val topAtk: DataFrame = dataset
.na.drop("all", Seq($(predictionCol)))
.select(col($(predictionCol)), col($(labelCol)).cast(LongType), col($(queryCol)))
.withColumn("rn", row_number().over(w)).where(col("rn") <= $(k))
.drop("rn")
.groupBy(col($(queryCol)))
.agg(collect_list($(labelCol)).as("topAtk"))

val mapToEmptyArray_ = udf(() => Array.empty[Long])

val predictionAndLabels: DataFrame = dataset
.join(topAtk, Seq($(queryCol)), "outer")
.withColumn("topAtk", coalesce(col("topAtk"), mapToEmptyArray_()))
.select($(labelCol), "topAtk")
Copy link

@ebernhardson ebernhardson Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need to run an aggregation on the label column, roughly the same as the previous aggregation but using labelCol as the sort instead of predictionCol?

Currently this generates a row per prediction, when ranking tasks should have a row per query. I think the aggregation should be run twice, then those two aggregations should be joined together on queryCol. That would result in a dataset containing (labels of top k predictions, top k actual labels)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. This is currently done in the previous step, when the topAtk Dataframe is calculated (line 101).

Unfortunately this is not compatible with RankingMetrics, which expects the format of predictionAndLabels as input. I didn't want to change RankingMetrics in this same PR.
So the predictionAndLabels DataFrame is calculated to use the same RankingMetrics from mllib (well, it is now UDFs based, but I didn't touched its logic).


val metrics = new RankingMetrics(predictionAndLabels, "topAtk", $(labelCol))
val metric = $(metricName) match {
case "mpr" => metrics.meanPercentileRank
}
metric
}

@Since("2.2.0")
override def isLargerBetter: Boolean = $(metricName) match {
case "mpr" => false
}

@Since("2.2.0")
override def copy(extra: ParamMap): RankingEvaluator = defaultCopy(extra)
}

@Since("2.2.0")
object RankingEvaluator extends DefaultParamsReadable[RankingEvaluator] {

@Since("2.2.0")
override def load(path: String): RankingEvaluator = super.load(path)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{mean, sum}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.DoubleType

@Since("2.2.0")
class RankingMetrics(
predictionAndObservations: DataFrame, predictionCol: String, labelCol: String)
extends Logging with Serializable {

/**
* Compute the Mean Percentile Rank (MPR) of all the queries.
*
* See the following paper for detail ("Expected percentile rank" in the paper):
* Hu, Y., Y. Koren, and C. Volinsky. “Collaborative Filtering for Implicit Feedback Datasets.”
* In 2008 Eighth IEEE International Conference on Data Mining, 263–72, 2008.
* doi:10.1109/ICDM.2008.22.
*
* @return the mean percentile rank
*/
lazy val meanPercentileRank: Double = {

def rank = udf((predicted: Seq[Any], actual: Any) => {
val l_i = predicted.indexOf(actual)

if (l_i == -1) {
1
} else {
l_i.toDouble / predicted.size
}
}, DoubleType)

val R_prime = predictionAndObservations.count()
Copy link

@bantmen bantmen Oct 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a sum instead of count?
(I know this is old/closed but other people might be referring to this code)

val predictionColumn: Column = predictionAndObservations.col(predictionCol)
val labelColumn: Column = predictionAndObservations.col(labelCol)

val rankSum: Double = predictionAndObservations
.withColumn("rank", rank(predictionColumn, labelColumn))
.agg(sum("rank")).first().getDouble(0)

rankSum / R_prime
}

/**
* Compute the average precision of all the queries, truncated at ranking position k.
*
* If for a query, the ranking algorithm returns n (n is less than k) results, the precision
* value will be computed as #(relevant items retrieved) / k. This formula also applies when
* the size of the ground truth set is less than k.
*
* If a query has an empty ground truth set, zero will be used as precision together with
* a log warning.
*
* See the following paper for detail:
*
* IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
*
* @param k the position to compute the truncated precision, must be positive
* @return the average precision at the first k ranking positions
*/
@Since("2.2.0")
def precisionAt(k: Int): Double = {
require(k > 0, "ranking position k should be positive")

def precisionAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
val actualSet = actual.toSet
if (actualSet.nonEmpty) {
val n = math.min(predicted.length, k)
var i = 0
var cnt = 0
while (i < n) {
if (actualSet.contains(predicted(i))) {
cnt += 1
}
i += 1
}
cnt.toDouble / k
} else {
logWarning("Empty ground truth set, check input data")
0.0
}
}, DoubleType)

val predictionColumn: Column = predictionAndObservations.col(predictionCol)
val labelColumn: Column = predictionAndObservations.col(labelCol)

predictionAndObservations
.withColumn("predictionAtK", precisionAtK(predictionColumn, labelColumn))
.agg(mean("predictionAtK")).first().getDouble(0)
}

/**
* Returns the mean average precision (MAP) of all the queries.
* If a query has an empty ground truth set, the average precision will be zero and a log
* warning is generated.
*/
lazy val meanAveragePrecision: Double = {

def map = udf((predicted: Seq[Any], actual: Seq[Any]) => {
val actualSet = actual.toSet
if (actualSet.nonEmpty) {
var i = 0
var cnt = 0
var precSum = 0.0
val n = predicted.length
while (i < n) {
if (actualSet.contains(predicted(i))) {
cnt += 1
precSum += cnt.toDouble / (i + 1)
}
i += 1
}
precSum / actualSet.size
} else {
logWarning("Empty ground truth set, check input data")
0.0
}
}, DoubleType)

val predictionColumn: Column = predictionAndObservations.col(predictionCol)
val labelColumn: Column = predictionAndObservations.col(labelCol)

predictionAndObservations
.withColumn("MAP", map(predictionColumn, labelColumn))
.agg(mean("MAP")).first().getDouble(0)
}

/**
* Compute the average NDCG value of all the queries, truncated at ranking position k.
* The discounted cumulative gain at position k is computed as:
* sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1),
* and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current
* implementation, the relevance value is binary.

* If a query has an empty ground truth set, zero will be used as ndcg together with
* a log warning.
*
* See the following paper for detail:
*
* IR evaluation methods for retrieving highly relevant documents. K. Jarvelin and J. Kekalainen
*
* @param k the position to compute the truncated ndcg, must be positive
* @return the average ndcg at the first k ranking positions
*/
@Since("2.2.0")
def ndcgAt(k: Int): Double = {
require(k > 0, "ranking position k should be positive")

def ndcgAtK = udf((predicted: Seq[Any], actual: Seq[Any]) => {
val actualSet = actual.toSet

if (actualSet.nonEmpty) {
val labSetSize = actualSet.size
val n = math.min(math.max(predicted.length, labSetSize), k)
var maxDcg = 0.0
var dcg = 0.0
var i = 0
while (i < n) {
val gain = 1.0 / math.log(i + 2)
if (i < predicted.length && actualSet.contains(predicted(i))) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right, there is no overlap between the calculation of dcg and max_dcg. The question asked here should be if the label at predicted(i) is "good". When treating the labels as binary relevant/not relevant I suppose that might use a threshold, but better would be to move away from a binary dcg and use the full equation from the docblock. I understand though that you are not looking to make major updates to the code from mllib, so it would probably be reasonable for someone to fix this in a followup.

Copy link
Author

@daniloascione daniloascione Apr 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be fixed in another PR to keep changes isolated. FYI, the original JIRA for this is here.

dcg += gain
}
if (i < labSetSize) {
maxDcg += gain
}
i += 1
}
dcg / maxDcg
} else {
logWarning("Empty ground truth set, check input data")
0.0
}
}, DoubleType)

val predictionColumn: Column = predictionAndObservations.col(predictionCol)
val labelColumn: Column = predictionAndObservations.col(labelCol)

predictionAndObservations
.withColumn("ndcgAtK", ndcgAtK(predictionColumn, labelColumn))
.agg(mean("ndcgAtK")).first().getDouble(0)
}
}
Loading