From 8d31e1e6c988951bfc0a6e70239e4468ae6fbc7b Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 27 Feb 2017 13:10:29 -0500 Subject: [PATCH 01/12] Added weight column for binary classification evaluator --- .../BinaryClassificationEvaluator.scala | 26 ++++++++++---- .../BinaryClassificationMetrics.scala | 34 ++++++++++++++----- .../binary/BinaryConfusionMatrix.scala | 24 ++++++------- .../binary/BinaryLabelCounter.scala | 14 ++++++-- .../BinaryClassificationEvaluatorSuite.scala | 27 +++++++++++++++ .../BinaryClassificationMetricsSuite.scala | 28 +++++++++++++++ 6 files changed, 123 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index bff72b20e1c3..fc7cdd0b3042 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -36,11 +36,17 @@ import org.apache.spark.sql.types.DoubleType @Since("1.2.0") @Experimental class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override val uid: String) - extends Evaluator with HasRawPredictionCol with HasLabelCol with DefaultParamsWritable { + extends Evaluator with HasRawPredictionCol with HasLabelCol + with HasWeightCol with DefaultParamsWritable { @Since("1.2.0") def this() = this(Identifiable.randomUID("binEval")) + /** + * Default number of bins to use for binary classification evaluation. + */ + val defaultNumberOfBins = 1000 + /** * param for metric name in evaluation (supports `"areaUnderROC"` (default), `"areaUnderPR"`) * @group param @@ -68,6 +74,10 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va @Since("1.2.0") def setLabelCol(value: String): this.type = set(labelCol, value) + /** @group setParam */ + @Since("2.2.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(metricName -> "areaUnderROC") @Since("2.0.0") @@ -77,12 +87,16 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va SchemaUtils.checkNumericType(schema, $(labelCol)) // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. - val scoreAndLabels = - dataset.select(col($(rawPredictionCol)), col($(labelCol)).cast(DoubleType)).rdd.map { - case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label) - case Row(rawPrediction: Double, label: Double) => (rawPrediction, label) + val scoreAndLabelsWithWeights = + dataset.select(col($(rawPredictionCol)), col($(labelCol)).cast(DoubleType), + if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))) + .rdd.map { + case Row(rawPrediction: Vector, label: Double, weight: Double) => + (rawPrediction(1), (label, weight)) + case Row(rawPrediction: Double, label: Double, weight: Double) => + (rawPrediction, (label, weight)) } - val metrics = new BinaryClassificationMetrics(scoreAndLabels) + val metrics = new BinaryClassificationMetrics(defaultNumberOfBins, scoreAndLabelsWithWeights) val metric = $(metricName) match { case "areaUnderROC" => metrics.areaUnderROC() case "areaUnderPR" => metrics.areaUnderPR() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 2cfcf38eb4ca..0cd7d088739e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.DataFrame /** * Evaluator for binary classification. * - * @param scoreAndLabels an RDD of (score, label) pairs. + * @param scoreAndLabelsWithWeights an RDD of (score, (label, weight)) pairs. * @param numBins if greater than 0, then the curves (ROC curve, PR curve) computed internally * will be down-sampled to this many "bins". If 0, no down-sampling will occur. * This is useful because the curve contains a point for each distinct score @@ -41,12 +41,26 @@ import org.apache.spark.sql.DataFrame * partition boundaries. */ @Since("1.0.0") -class BinaryClassificationMetrics @Since("1.3.0") ( - @Since("1.3.0") val scoreAndLabels: RDD[(Double, Double)], - @Since("1.3.0") val numBins: Int) extends Logging { +class BinaryClassificationMetrics @Since("2.2.0") ( + val numBins: Int, + @Since("2.2.0") val scoreAndLabelsWithWeights: RDD[(Double, (Double, Double))]) + extends Logging { require(numBins >= 0, "numBins must be nonnegative") + /** + * Retrieves the score and labels (for binary compatibility). + * @return The score and labels. + */ + @Since("1.0.0") + def scoreAndLabels: RDD[(Double, Double)] = { + scoreAndLabelsWithWeights.map(values => (values._1, values._2._1)) + } + + @Since("1.0.0") + def this(@Since("1.3.0") scoreAndLabels: RDD[(Double, Double)], @Since("1.3.0") numBins: Int) = + this(numBins, scoreAndLabels.map(scoreAndLabel => (scoreAndLabel._1, (scoreAndLabel._2, 1.0)))) + /** * Defaults `numBins` to 0. */ @@ -146,11 +160,13 @@ class BinaryClassificationMetrics @Since("1.3.0") ( private lazy val ( cumulativeCounts: RDD[(Double, BinaryLabelCounter)], confusions: RDD[(Double, BinaryConfusionMatrix)]) = { - // Create a bin for each distinct score value, count positives and negatives within each bin, - // and then sort by score values in descending order. - val counts = scoreAndLabels.combineByKey( - createCombiner = (label: Double) => new BinaryLabelCounter(0L, 0L) += label, - mergeValue = (c: BinaryLabelCounter, label: Double) => c += label, + // Create a bin for each distinct score value, count weighted positives and + // negatives within each bin, and then sort by score values in descending order. + val counts = scoreAndLabelsWithWeights.combineByKey( + createCombiner = (labelAndWeight: (Double, Double)) => + new BinaryLabelCounter(0L, 0L) += (labelAndWeight._1, labelAndWeight._2), + mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => + c += (labelAndWeight._1, labelAndWeight._2), mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 ).sortByKey(ascending = false) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala index 559c6ef7e725..1c0130700421 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -22,22 +22,22 @@ package org.apache.spark.mllib.evaluation.binary */ private[evaluation] trait BinaryConfusionMatrix { /** number of true positives */ - def numTruePositives: Long + def numTruePositives: Double /** number of false positives */ - def numFalsePositives: Long + def numFalsePositives: Double /** number of false negatives */ - def numFalseNegatives: Long + def numFalseNegatives: Double /** number of true negatives */ - def numTrueNegatives: Long + def numTrueNegatives: Double /** number of positives */ - def numPositives: Long = numTruePositives + numFalseNegatives + def numPositives: Double = numTruePositives + numFalseNegatives /** number of negatives */ - def numNegatives: Long = numFalsePositives + numTrueNegatives + def numNegatives: Double = numFalsePositives + numTrueNegatives } /** @@ -51,20 +51,20 @@ private[evaluation] case class BinaryConfusionMatrixImpl( totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix { /** number of true positives */ - override def numTruePositives: Long = count.numPositives + override def numTruePositives: Double = count.numPositives /** number of false positives */ - override def numFalsePositives: Long = count.numNegatives + override def numFalsePositives: Double = count.numNegatives /** number of false negatives */ - override def numFalseNegatives: Long = totalCount.numPositives - count.numPositives + override def numFalseNegatives: Double = totalCount.numPositives - count.numPositives /** number of true negatives */ - override def numTrueNegatives: Long = totalCount.numNegatives - count.numNegatives + override def numTrueNegatives: Double = totalCount.numNegatives - count.numNegatives /** number of positives */ - override def numPositives: Long = totalCount.numPositives + override def numPositives: Double = totalCount.numPositives /** number of negatives */ - override def numNegatives: Long = totalCount.numNegatives + override def numNegatives: Double = totalCount.numNegatives } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala index 1e610c20092a..eb38af748ad9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala @@ -24,14 +24,22 @@ package org.apache.spark.mllib.evaluation.binary * @param numNegatives number of negative labels */ private[evaluation] class BinaryLabelCounter( - var numPositives: Long = 0L, - var numNegatives: Long = 0L) extends Serializable { + var numPositives: Double = 0.0, + var numNegatives: Double = 0.0) extends Serializable { /** Processes a label. */ def +=(label: Double): BinaryLabelCounter = { // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle // -1.0 for negative as well. - if (label > 0.5) numPositives += 1L else numNegatives += 1L + if (label > 0.5) numPositives += 1.0 else numNegatives += 1.0 + this + } + + /** Processes a label with a weight. */ + def +=(label: Double, weight: Double): BinaryLabelCounter = { + // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle + // -1.0 for negative as well. + if (label > 0.5) numPositives += weight else numNegatives += weight this } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala index 2b0909acf69c..1bd2db79573d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala @@ -71,6 +71,33 @@ class BinaryClassificationEvaluatorSuite assert(thrown.getMessage.replace("\n", "") contains "but was actually of type string.") } + test("should accept weight column") { + val weightCol = "weight" + // get metric with weight column + val evaluator = new BinaryClassificationEvaluator() + .setMetricName("areaUnderROC").setWeightCol(weightCol) + val vectorDF = Seq( + (0d, Vectors.dense(2.5, 12), 1.0), + (1d, Vectors.dense(1, 3), 1.0), + (0d, Vectors.dense(10, 2), 1.0) + ).toDF("label", "rawPrediction", weightCol) + val result = evaluator.evaluate(vectorDF) + // without weight column + val evaluator2 = new BinaryClassificationEvaluator() + .setMetricName("areaUnderROC") + val result2 = evaluator2.evaluate(vectorDF) + assert(result === result2) + // use different weights, validate metrics change + val vectorDF2 = Seq( + (0d, Vectors.dense(2.5, 12), 2.5), + (1d, Vectors.dense(1, 3), 0.1), + (0d, Vectors.dense(10, 2), 2.0) + ).toDF("label", "rawPrediction", weightCol) + val result3 = evaluator.evaluate(vectorDF2) + // Since wrong result weighted more heavily, expect the score to be lower + assert(result3 < result) + } + test("should support all NumericType labels and not support other types") { val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("prediction") MLTestingUtils.checkNumericTypes(evaluator, spark) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala index a08917ac1ebe..742eaec6623d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -82,6 +82,34 @@ class BinaryClassificationMetricsSuite extends SparkFunSuite with MLlibTestSpark validateMetrics(metrics, thresholds, rocCurve, prCurve, f1, f2, precisions, recalls) } + test("binary evaluation metrics with weights") { + val w1 = 1.5 + val w2 = 0.7 + val w3 = 0.4 + val scoreAndLabelsWithWeights = sc.parallelize( + Seq((0.1, (0.0, w1)), (0.1, (1.0, w2)), (0.4, (0.0, w1)), (0.6, (0.0, w3)), + (0.6, (1.0, w2)), (0.6, (1.0, w2)), (0.8, (1.0, w1))), 2) + val metrics = new BinaryClassificationMetrics(0, scoreAndLabelsWithWeights) + val thresholds = Seq(0.8, 0.6, 0.4, 0.1) + val numTruePositives = + Seq(1.0 * w1, 1.0 * w1 + 2.0 * w2, 1.0 * w1 + 2.0 * w2, 3.0 * w2 + 1.0 * w1) + val numFalsePositives = Seq(0.0, 1.0 * w3, 1.0 * w1 + 1.0 * w3, 1.0 * w3 + 2.0 * w1) + val numPositives = 3 * w2 + 1 * w1 + val numNegatives = 2 * w1 + w3 + val precisions = numTruePositives.zip(numFalsePositives).map { case (t, f) => + t.toDouble / (t + f) + } + val recalls = numTruePositives.map(t => t / numPositives) + val fpr = numFalsePositives.map(f => f / numNegatives) + val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recalls) ++ Seq((1.0, 1.0)) + val pr = recalls.zip(precisions) + val prCurve = Seq((0.0, 1.0)) ++ pr + val f1 = pr.map { case (r, p) => 2.0 * (p * r) / (p + r)} + val f2 = pr.map { case (r, p) => 5.0 * (p * r) / (4.0 * p + r)} + + validateMetrics(metrics, thresholds, rocCurve, prCurve, f1, f2, precisions, recalls) + } + test("binary evaluation metrics for RDD where all examples have positive label") { val scoreAndLabels = sc.parallelize(Seq((0.5, 1.0), (0.5, 1.0)), 2) val metrics = new BinaryClassificationMetrics(scoreAndLabels) From 7f8d5ad2086b4ed9c2448eb26c3ce81181838f7e Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 26 Jun 2017 00:19:31 -0400 Subject: [PATCH 02/12] Updated based on comments - fixed since tag, renamed vars, added check for weight col --- .../BinaryClassificationEvaluator.scala | 6 +++- .../BinaryClassificationMetrics.scala | 6 ++-- .../BinaryClassificationMetricComputers.scala | 12 +++---- .../binary/BinaryConfusionMatrix.scala | 36 +++++++++---------- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index fc7cdd0b3042..88a435018b21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -85,11 +85,15 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(rawPredictionCol), Seq(DoubleType, new VectorUDT)) SchemaUtils.checkNumericType(schema, $(labelCol)) + if (isDefined(weightCol)) { + SchemaUtils.checkNumericType(schema, $(weightCol)) + } // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. val scoreAndLabelsWithWeights = dataset.select(col($(rawPredictionCol)), col($(labelCol)).cast(DoubleType), - if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))) + if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) + else col($(weightCol)).cast(DoubleType)) .rdd.map { case Row(rawPrediction: Vector, label: Double, weight: Double) => (rawPrediction(1), (label, weight)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 0cd7d088739e..c6b9cb6758f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -52,12 +52,12 @@ class BinaryClassificationMetrics @Since("2.2.0") ( * Retrieves the score and labels (for binary compatibility). * @return The score and labels. */ - @Since("1.0.0") + @Since("1.3.0") def scoreAndLabels: RDD[(Double, Double)] = { scoreAndLabelsWithWeights.map(values => (values._1, values._2._1)) } - @Since("1.0.0") + @Since("1.3.0") def this(@Since("1.3.0") scoreAndLabels: RDD[(Double, Double)], @Since("1.3.0") numBins: Int) = this(numBins, scoreAndLabels.map(scoreAndLabel => (scoreAndLabel._1, (scoreAndLabel._2, 1.0)))) @@ -164,7 +164,7 @@ class BinaryClassificationMetrics @Since("2.2.0") ( // negatives within each bin, and then sort by score values in descending order. val counts = scoreAndLabelsWithWeights.combineByKey( createCombiner = (labelAndWeight: (Double, Double)) => - new BinaryLabelCounter(0L, 0L) += (labelAndWeight._1, labelAndWeight._2), + new BinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2), mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => c += (labelAndWeight._1, labelAndWeight._2), mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala index 5a4c6aef50b7..784db3196c3a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala @@ -27,11 +27,11 @@ private[evaluation] trait BinaryClassificationMetricComputer extends Serializabl /** Precision. Defined as 1.0 when there are no positive examples. */ private[evaluation] object Precision extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { - val totalPositives = c.numTruePositives + c.numFalsePositives + val totalPositives = c.weightedTruePositives + c.weightedFalsePositives if (totalPositives == 0) { 1.0 } else { - c.numTruePositives.toDouble / totalPositives + c.weightedTruePositives.toDouble / totalPositives } } } @@ -39,10 +39,10 @@ private[evaluation] object Precision extends BinaryClassificationMetricComputer /** False positive rate. Defined as 0.0 when there are no negative examples. */ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { - if (c.numNegatives == 0) { + if (c.weightedNegatives == 0) { 0.0 } else { - c.numFalsePositives.toDouble / c.numNegatives + c.weightedFalsePositives.toDouble / c.weightedNegatives } } } @@ -50,10 +50,10 @@ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricC /** Recall. Defined as 0.0 when there are no positive examples. */ private[evaluation] object Recall extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { - if (c.numPositives == 0) { + if (c.weightedPositives == 0) { 0.0 } else { - c.numTruePositives.toDouble / c.numPositives + c.weightedTruePositives.toDouble / c.weightedPositives } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala index 1c0130700421..c398fb830280 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -21,23 +21,23 @@ package org.apache.spark.mllib.evaluation.binary * Trait for a binary confusion matrix. */ private[evaluation] trait BinaryConfusionMatrix { - /** number of true positives */ - def numTruePositives: Double + /** weighted number of true positives */ + def weightedTruePositives: Double - /** number of false positives */ - def numFalsePositives: Double + /** weighted number of false positives */ + def weightedFalsePositives: Double - /** number of false negatives */ - def numFalseNegatives: Double + /** weighted number of false negatives */ + def weightedFalseNegatives: Double - /** number of true negatives */ - def numTrueNegatives: Double + /** weighted number of true negatives */ + def weightedTrueNegatives: Double - /** number of positives */ - def numPositives: Double = numTruePositives + numFalseNegatives + /** weighted number of positives */ + def weightedPositives: Double = weightedTruePositives + weightedFalseNegatives - /** number of negatives */ - def numNegatives: Double = numFalsePositives + numTrueNegatives + /** weighted number of negatives */ + def weightedNegatives: Double = weightedFalsePositives + weightedTrueNegatives } /** @@ -51,20 +51,20 @@ private[evaluation] case class BinaryConfusionMatrixImpl( totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix { /** number of true positives */ - override def numTruePositives: Double = count.numPositives + override def weightedTruePositives: Double = count.numPositives /** number of false positives */ - override def numFalsePositives: Double = count.numNegatives + override def weightedFalsePositives: Double = count.numNegatives /** number of false negatives */ - override def numFalseNegatives: Double = totalCount.numPositives - count.numPositives + override def weightedFalseNegatives: Double = totalCount.numPositives - count.numPositives /** number of true negatives */ - override def numTrueNegatives: Double = totalCount.numNegatives - count.numNegatives + override def weightedTrueNegatives: Double = totalCount.numNegatives - count.numNegatives /** number of positives */ - override def numPositives: Double = totalCount.numPositives + override def weightedPositives: Double = totalCount.numPositives /** number of negatives */ - override def numNegatives: Double = totalCount.numNegatives + override def weightedNegatives: Double = totalCount.numNegatives } From af96c4586a4976cd99b29457cb765c7a1dae72a0 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 4 Feb 2019 23:55:49 -0500 Subject: [PATCH 03/12] updated version --- .../spark/ml/evaluation/BinaryClassificationEvaluator.scala | 2 +- .../spark/mllib/evaluation/BinaryClassificationMetrics.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 88a435018b21..21d4d019cc78 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -75,7 +75,7 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va def setLabelCol(value: String): this.type = set(labelCol, value) /** @group setParam */ - @Since("2.2.0") + @Since("3.0.0") def setWeightCol(value: String): this.type = set(weightCol, value) setDefault(metricName -> "areaUnderROC") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index c6b9cb6758f7..022bceb61409 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -41,9 +41,9 @@ import org.apache.spark.sql.DataFrame * partition boundaries. */ @Since("1.0.0") -class BinaryClassificationMetrics @Since("2.2.0") ( +class BinaryClassificationMetrics @Since("3.0.0") ( val numBins: Int, - @Since("2.2.0") val scoreAndLabelsWithWeights: RDD[(Double, (Double, Double))]) + @Since("3.0.0") val scoreAndLabelsWithWeights: RDD[(Double, (Double, Double))]) extends Logging { require(numBins >= 0, "numBins must be nonnegative") From 16a03266a47b3f2a6266cc911526a444b8e5b0f1 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Tue, 12 Feb 2019 00:11:16 -0500 Subject: [PATCH 04/12] updated based on comments --- .../BinaryClassificationMetricComputers.scala | 12 +++++------ .../binary/BinaryConfusionMatrix.scala | 14 +++++++------ .../binary/BinaryLabelCounter.scala | 20 +++++++++---------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala index 784db3196c3a..d98ca2bdc9de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryClassificationMetricComputers.scala @@ -28,10 +28,10 @@ private[evaluation] trait BinaryClassificationMetricComputer extends Serializabl private[evaluation] object Precision extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { val totalPositives = c.weightedTruePositives + c.weightedFalsePositives - if (totalPositives == 0) { + if (totalPositives == 0.0) { 1.0 } else { - c.weightedTruePositives.toDouble / totalPositives + c.weightedTruePositives / totalPositives } } } @@ -39,10 +39,10 @@ private[evaluation] object Precision extends BinaryClassificationMetricComputer /** False positive rate. Defined as 0.0 when there are no negative examples. */ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { - if (c.weightedNegatives == 0) { + if (c.weightedNegatives == 0.0) { 0.0 } else { - c.weightedFalsePositives.toDouble / c.weightedNegatives + c.weightedFalsePositives / c.weightedNegatives } } } @@ -50,10 +50,10 @@ private[evaluation] object FalsePositiveRate extends BinaryClassificationMetricC /** Recall. Defined as 0.0 when there are no positive examples. */ private[evaluation] object Recall extends BinaryClassificationMetricComputer { override def apply(c: BinaryConfusionMatrix): Double = { - if (c.weightedPositives == 0) { + if (c.weightedPositives == 0.0) { 0.0 } else { - c.weightedTruePositives.toDouble / c.weightedPositives + c.weightedTruePositives / c.weightedPositives } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala index c398fb830280..192c9b1863fe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryConfusionMatrix.scala @@ -51,20 +51,22 @@ private[evaluation] case class BinaryConfusionMatrixImpl( totalCount: BinaryLabelCounter) extends BinaryConfusionMatrix { /** number of true positives */ - override def weightedTruePositives: Double = count.numPositives + override def weightedTruePositives: Double = count.weightedNumPositives /** number of false positives */ - override def weightedFalsePositives: Double = count.numNegatives + override def weightedFalsePositives: Double = count.weightedNumNegatives /** number of false negatives */ - override def weightedFalseNegatives: Double = totalCount.numPositives - count.numPositives + override def weightedFalseNegatives: Double = + totalCount.weightedNumPositives - count.weightedNumPositives /** number of true negatives */ - override def weightedTrueNegatives: Double = totalCount.numNegatives - count.numNegatives + override def weightedTrueNegatives: Double = + totalCount.weightedNumNegatives - count.weightedNumNegatives /** number of positives */ - override def weightedPositives: Double = totalCount.numPositives + override def weightedPositives: Double = totalCount.weightedNumPositives /** number of negatives */ - override def weightedNegatives: Double = totalCount.numNegatives + override def weightedNegatives: Double = totalCount.weightedNumNegatives } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala index eb38af748ad9..1ad91966b214 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/binary/BinaryLabelCounter.scala @@ -20,18 +20,18 @@ package org.apache.spark.mllib.evaluation.binary /** * A counter for positives and negatives. * - * @param numPositives number of positive labels - * @param numNegatives number of negative labels + * @param weightedNumPositives weighted number of positive labels + * @param weightedNumNegatives weighted number of negative labels */ private[evaluation] class BinaryLabelCounter( - var numPositives: Double = 0.0, - var numNegatives: Double = 0.0) extends Serializable { + var weightedNumPositives: Double = 0.0, + var weightedNumNegatives: Double = 0.0) extends Serializable { /** Processes a label. */ def +=(label: Double): BinaryLabelCounter = { // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle // -1.0 for negative as well. - if (label > 0.5) numPositives += 1.0 else numNegatives += 1.0 + if (label > 0.5) weightedNumPositives += 1.0 else weightedNumNegatives += 1.0 this } @@ -39,20 +39,20 @@ private[evaluation] class BinaryLabelCounter( def +=(label: Double, weight: Double): BinaryLabelCounter = { // Though we assume 1.0 for positive and 0.0 for negative, the following check will handle // -1.0 for negative as well. - if (label > 0.5) numPositives += weight else numNegatives += weight + if (label > 0.5) weightedNumPositives += weight else weightedNumNegatives += weight this } /** Merges another counter. */ def +=(other: BinaryLabelCounter): BinaryLabelCounter = { - numPositives += other.numPositives - numNegatives += other.numNegatives + weightedNumPositives += other.weightedNumPositives + weightedNumNegatives += other.weightedNumNegatives this } override def clone: BinaryLabelCounter = { - new BinaryLabelCounter(numPositives, numNegatives) + new BinaryLabelCounter(weightedNumPositives, weightedNumNegatives) } - override def toString: String = s"{numPos: $numPositives, numNeg: $numNegatives}" + override def toString: String = s"{numPos: $weightedNumPositives, numNeg: $weightedNumNegatives}" } From 3d9104cec58167e27263f34f0e31509d917618bf Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Tue, 12 Feb 2019 23:55:39 -0500 Subject: [PATCH 05/12] made code more similar to other two PRs --- .../BinaryClassificationEvaluator.scala | 6 ++-- .../BinaryClassificationMetrics.scala | 29 ++++++++----------- .../BinaryClassificationMetricsSuite.scala | 6 ++-- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 21d4d019cc78..ce3abe9e019a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -96,11 +96,11 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va else col($(weightCol)).cast(DoubleType)) .rdd.map { case Row(rawPrediction: Vector, label: Double, weight: Double) => - (rawPrediction(1), (label, weight)) + (rawPrediction(1), label, weight) case Row(rawPrediction: Double, label: Double, weight: Double) => - (rawPrediction, (label, weight)) + (rawPrediction, label, weight) } - val metrics = new BinaryClassificationMetrics(defaultNumberOfBins, scoreAndLabelsWithWeights) + val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, defaultNumberOfBins) val metric = $(metricName) match { case "areaUnderROC" => metrics.areaUnderROC() case "areaUnderPR" => metrics.areaUnderPR() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 022bceb61409..77295e075ab3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.DataFrame /** * Evaluator for binary classification. * - * @param scoreAndLabelsWithWeights an RDD of (score, (label, weight)) pairs. + * @param scoreAndLabelsWithOptWeight an RDD of (score, label, weight) tuples. * @param numBins if greater than 0, then the curves (ROC curve, PR curve) computed internally * will be down-sampled to this many "bins". If 0, no down-sampling will occur. * This is useful because the curve contains a point for each distinct score @@ -42,24 +42,19 @@ import org.apache.spark.sql.DataFrame */ @Since("1.0.0") class BinaryClassificationMetrics @Since("3.0.0") ( - val numBins: Int, - @Since("3.0.0") val scoreAndLabelsWithWeights: RDD[(Double, (Double, Double))]) + @Since("1.3.0") val scoreAndLabelsWithOptWeight: RDD[_ <: Product], + @Since("1.3.0") val numBins: Int) extends Logging { - - require(numBins >= 0, "numBins must be nonnegative") - - /** - * Retrieves the score and labels (for binary compatibility). - * @return The score and labels. - */ - @Since("1.3.0") - def scoreAndLabels: RDD[(Double, Double)] = { - scoreAndLabelsWithWeights.map(values => (values._1, values._2._1)) + val scoreLabelsWeight: RDD[(Double, (Double, Double))] = scoreAndLabelsWithOptWeight.map { + case (prediction: Double, label: Double, weight: Double) => + (prediction, (label, weight)) + case (prediction: Double, label: Double) => + (prediction, (label, 1.0)) + case other => + throw new IllegalArgumentException(s"Expected tuples, got $other") } - @Since("1.3.0") - def this(@Since("1.3.0") scoreAndLabels: RDD[(Double, Double)], @Since("1.3.0") numBins: Int) = - this(numBins, scoreAndLabels.map(scoreAndLabel => (scoreAndLabel._1, (scoreAndLabel._2, 1.0)))) + require(numBins >= 0, "numBins must be nonnegative") /** * Defaults `numBins` to 0. @@ -162,7 +157,7 @@ class BinaryClassificationMetrics @Since("3.0.0") ( confusions: RDD[(Double, BinaryConfusionMatrix)]) = { // Create a bin for each distinct score value, count weighted positives and // negatives within each bin, and then sort by score values in descending order. - val counts = scoreAndLabelsWithWeights.combineByKey( + val counts = scoreLabelsWeight.combineByKey( createCombiner = (labelAndWeight: (Double, Double)) => new BinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2), mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala index 742eaec6623d..41afbf77a3e5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -87,9 +87,9 @@ class BinaryClassificationMetricsSuite extends SparkFunSuite with MLlibTestSpark val w2 = 0.7 val w3 = 0.4 val scoreAndLabelsWithWeights = sc.parallelize( - Seq((0.1, (0.0, w1)), (0.1, (1.0, w2)), (0.4, (0.0, w1)), (0.6, (0.0, w3)), - (0.6, (1.0, w2)), (0.6, (1.0, w2)), (0.8, (1.0, w1))), 2) - val metrics = new BinaryClassificationMetrics(0, scoreAndLabelsWithWeights) + Seq((0.1, 0.0, w1), (0.1, 1.0, w2), (0.4, 0.0, w1), (0.6, 0.0, w3), + (0.6, 1.0, w2), (0.6, 1.0, w2), (0.8, 1.0, w1)), 2) + val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, 0) val thresholds = Seq(0.8, 0.6, 0.4, 0.1) val numTruePositives = Seq(1.0 * w1, 1.0 * w1 + 2.0 * w2, 1.0 * w1 + 2.0 * w2, 3.0 * w2 + 1.0 * w1) From d99f6d1e48519fb5f5b56b048612900aa585dd71 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Wed, 13 Feb 2019 00:19:52 -0500 Subject: [PATCH 06/12] fix MIMA --- .../mllib/evaluation/BinaryClassificationMetrics.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 77295e075ab3..aa4a9e7e2ce2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -62,6 +62,15 @@ class BinaryClassificationMetrics @Since("3.0.0") ( @Since("1.0.0") def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) + /** + * Retrieves the score and labels (for binary compatibility). + * @return The score and labels. + */ + @Since("1.0.0") + def scoreAndLabels: RDD[(Double, Double)] = { + scoreLabelsWeight.map(values => (values._1, values._2._1)) + } + /** * An auxiliary constructor taking a DataFrame. * @param scoreAndLabels a DataFrame with two double columns: score and label From fd80790e7a80b3ffa2a830f0c2305adc07cdd39c Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Fri, 15 Feb 2019 00:49:04 -0500 Subject: [PATCH 07/12] updated based on comments and fixed style --- .../ml/evaluation/BinaryClassificationEvaluator.scala | 7 ++++--- .../mllib/evaluation/BinaryClassificationMetrics.scala | 10 +++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index ce3abe9e019a..f43a226cee1c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -91,10 +91,11 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. val scoreAndLabelsWithWeights = - dataset.select(col($(rawPredictionCol)), col($(labelCol)).cast(DoubleType), + dataset.select( + col($(rawPredictionCol)), + col($(labelCol)).cast(DoubleType), if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) - else col($(weightCol)).cast(DoubleType)) - .rdd.map { + else col($(weightCol)).cast(DoubleType)).rdd.map { case Row(rawPrediction: Vector, label: Double, weight: Double) => (rawPrediction(1), label, weight) case Row(rawPrediction: Double, label: Double, weight: Double) => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index aa4a9e7e2ce2..a1672767c083 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -63,12 +63,12 @@ class BinaryClassificationMetrics @Since("3.0.0") ( def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) /** - * Retrieves the score and labels (for binary compatibility). - * @return The score and labels. - */ + * Retrieves the score and labels (for binary compatibility). + * @return The score and labels. + */ @Since("1.0.0") def scoreAndLabels: RDD[(Double, Double)] = { - scoreLabelsWeight.map(values => (values._1, values._2._1)) + scoreLabelsWeight.map { case (prediction, (label, _)) => (prediction, label) } } /** @@ -168,7 +168,7 @@ class BinaryClassificationMetrics @Since("3.0.0") ( // negatives within each bin, and then sort by score values in descending order. val counts = scoreLabelsWeight.combineByKey( createCombiner = (labelAndWeight: (Double, Double)) => - new BinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2), + new BinaryLabelCounter(labelAndWeight._1, labelAndWeight._2), mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => c += (labelAndWeight._1, labelAndWeight._2), mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 From bde306907fbf6f43801ce6a7a3037f7017e13379 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 18 Feb 2019 23:32:07 -0500 Subject: [PATCH 08/12] updated based on comments --- .../ml/evaluation/BinaryClassificationEvaluator.scala | 7 +------ .../mllib/evaluation/BinaryClassificationMetrics.scala | 5 +++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index f43a226cee1c..c6b04333885a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -42,11 +42,6 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va @Since("1.2.0") def this() = this(Identifiable.randomUID("binEval")) - /** - * Default number of bins to use for binary classification evaluation. - */ - val defaultNumberOfBins = 1000 - /** * param for metric name in evaluation (supports `"areaUnderROC"` (default), `"areaUnderPR"`) * @group param @@ -101,7 +96,7 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va case Row(rawPrediction: Double, label: Double, weight: Double) => (rawPrediction, label, weight) } - val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, defaultNumberOfBins) + val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights) val metric = $(metricName) match { case "areaUnderROC" => metrics.areaUnderROC() case "areaUnderPR" => metrics.areaUnderPR() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index a1672767c083..11da5c03c351 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -43,10 +43,11 @@ import org.apache.spark.sql.DataFrame @Since("1.0.0") class BinaryClassificationMetrics @Since("3.0.0") ( @Since("1.3.0") val scoreAndLabelsWithOptWeight: RDD[_ <: Product], - @Since("1.3.0") val numBins: Int) + @Since("1.3.0") val numBins: Int = 1000) extends Logging { val scoreLabelsWeight: RDD[(Double, (Double, Double))] = scoreAndLabelsWithOptWeight.map { case (prediction: Double, label: Double, weight: Double) => + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") (prediction, (label, weight)) case (prediction: Double, label: Double) => (prediction, (label, 1.0)) @@ -66,7 +67,7 @@ class BinaryClassificationMetrics @Since("3.0.0") ( * Retrieves the score and labels (for binary compatibility). * @return The score and labels. */ - @Since("1.0.0") + @Since("1.3.0") def scoreAndLabels: RDD[(Double, Double)] = { scoreLabelsWeight.map { case (prediction, (label, _)) => (prediction, label) } } From 955b0226e8614b5e024da2cc444a084429efae3f Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Mon, 18 Feb 2019 23:57:04 -0500 Subject: [PATCH 09/12] fixed failing tests --- .../spark/mllib/evaluation/BinaryClassificationMetrics.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 11da5c03c351..f36a66a127de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -169,7 +169,7 @@ class BinaryClassificationMetrics @Since("3.0.0") ( // negatives within each bin, and then sort by score values in descending order. val counts = scoreLabelsWeight.combineByKey( createCombiner = (labelAndWeight: (Double, Double)) => - new BinaryLabelCounter(labelAndWeight._1, labelAndWeight._2), + new BinaryLabelCounter(0.0, 0.0) += (labelAndWeight._1, labelAndWeight._2), mergeValue = (c: BinaryLabelCounter, labelAndWeight: (Double, Double)) => c += (labelAndWeight._1, labelAndWeight._2), mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 From 079e1140792cd4e3c5fb21ebbb1dd9aca370c977 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Thu, 21 Feb 2019 23:46:18 -0500 Subject: [PATCH 10/12] updated doc based on comments --- .../spark/mllib/evaluation/BinaryClassificationMetrics.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index f36a66a127de..7a4d901bb1c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.DataFrame /** * Evaluator for binary classification. * - * @param scoreAndLabelsWithOptWeight an RDD of (score, label, weight) tuples. + * @param scoreAndLabelsWithOptWeight an RDD of (score, label) or (score, label, weight) tuples. * @param numBins if greater than 0, then the curves (ROC curve, PR curve) computed internally * will be down-sampled to this many "bins". If 0, no down-sampling will occur. * This is useful because the curve contains a point for each distinct score From a571adcf5dcbcfb7fcd357fecbf74093df95d4aa Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Sat, 23 Feb 2019 23:54:29 -0500 Subject: [PATCH 11/12] updated python code --- python/pyspark/ml/evaluation.py | 18 +++++++++++++----- python/pyspark/mllib/evaluation.py | 22 +++++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index f563a2d4d283..57c1887855fd 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -106,7 +106,7 @@ def isLargerBetter(self): @inherit_doc -class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol, +class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol, HasWeightCol, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -130,6 +130,14 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction >>> evaluator2 = BinaryClassificationEvaluator.load(bce_path) >>> str(evaluator2.getRawPredictionCol()) 'raw' + >>> scoreAndLabelsAndWeight = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1], x[2]), + ... [(0.1, 0.0, 1.0), (0.1, 1.0, 0.9), (0.4, 0.0, 0.7), (0.6, 0.0, 0.9), + ... (0.6, 1.0, 1.0), (0.6, 1.0, 0.3), (0.8, 1.0, 1.0)]) + >>> dataset = spark.createDataFrame(scoreAndLabelsAndWeight, ["raw", "label", "weight"]) + ... + >>> evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw", weightCol="weight") + >>> evaluator.evaluate(dataset) + 0.70... .. versionadded:: 1.4.0 """ @@ -140,10 +148,10 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction @keyword_only def __init__(self, rawPredictionCol="rawPrediction", labelCol="label", - metricName="areaUnderROC"): + metricName="areaUnderROC", weightCol=None): """ __init__(self, rawPredictionCol="rawPrediction", labelCol="label", \ - metricName="areaUnderROC") + metricName="areaUnderROC", weightCol=None) """ super(BinaryClassificationEvaluator, self).__init__() self._java_obj = self._new_java_obj( @@ -169,10 +177,10 @@ def getMetricName(self): @keyword_only @since("1.4.0") def setParams(self, rawPredictionCol="rawPrediction", labelCol="label", - metricName="areaUnderROC"): + metricName="areaUnderROC", weightCol=None): """ setParams(self, rawPredictionCol="rawPrediction", labelCol="label", \ - metricName="areaUnderROC") + metricName="areaUnderROC", weightCol=None) Sets params for binary classification evaluator. """ kwargs = self._input_kwargs diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index b0283941171a..ca1a3be16017 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -30,7 +30,7 @@ class BinaryClassificationMetrics(JavaModelWrapper): """ Evaluator for binary classification. - :param scoreAndLabels: an RDD of (score, label) pairs + :param scoreAndLabelsWithOptWeight: an RDD of score, label and optional weight. >>> scoreAndLabels = sc.parallelize([ ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2) @@ -40,16 +40,28 @@ class BinaryClassificationMetrics(JavaModelWrapper): >>> metrics.areaUnderPR 0.83... >>> metrics.unpersist() + >>> scoreAndLabelsWithOptWeight = sc.parallelize([ + ... (0.1, 0.0, 1.0), (0.1, 1.0, 0.4), (0.4, 0.0, 0.2), (0.6, 0.0, 0.6), (0.6, 1.0, 0.9), + ... (0.6, 1.0, 0.5), (0.8, 1.0, 0.7)], 2) + >>> metrics = BinaryClassificationMetrics(scoreAndLabelsWithOptWeight) + >>> metrics.areaUnderROC + 0.70... + >>> metrics.areaUnderPR + 0.83... .. versionadded:: 1.4.0 """ - def __init__(self, scoreAndLabels): - sc = scoreAndLabels.ctx + def __init__(self, scoreAndLabelsWithOptWeight): + sc = scoreAndLabelsWithOptWeight.ctx sql_ctx = SQLContext.getOrCreate(sc) - df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([ + numCol = len(scoreAndLabelsWithOptWeight.first()) + schema = StructType([ StructField("score", DoubleType(), nullable=False), - StructField("label", DoubleType(), nullable=False)])) + StructField("label", DoubleType(), nullable=False)]) + if (numCol == 3): + schema.add("weight", DoubleType(), False) + df = sql_ctx.createDataFrame(scoreAndLabelsWithOptWeight, schema=schema) java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics java_model = java_class(df._jdf) super(BinaryClassificationMetrics, self).__init__(java_model) From 00bfec119b2e15911fab616fe9f755d364cad3b2 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Sun, 24 Feb 2019 22:54:47 -0500 Subject: [PATCH 12/12] updated based on new comments --- .../BinaryClassificationMetrics.scala | 26 ++++++++-------- .../mllib/evaluation/MulticlassMetrics.scala | 19 ++++++++---- .../BinaryClassificationEvaluatorSuite.scala | 30 +++++++++---------- .../BinaryClassificationMetricsSuite.scala | 6 ++-- python/pyspark/ml/evaluation.py | 2 ++ python/pyspark/mllib/evaluation.py | 28 ++++++++--------- 6 files changed, 59 insertions(+), 52 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 7a4d901bb1c7..cc89edc286a3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -21,12 +21,12 @@ import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.mllib.evaluation.binary._ import org.apache.spark.rdd.{RDD, UnionRDD} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} /** * Evaluator for binary classification. * - * @param scoreAndLabelsWithOptWeight an RDD of (score, label) or (score, label, weight) tuples. + * @param scoreAndLabels an RDD of (score, label) or (score, label, weight) tuples. * @param numBins if greater than 0, then the curves (ROC curve, PR curve) computed internally * will be down-sampled to this many "bins". If 0, no down-sampling will occur. * This is useful because the curve contains a point for each distinct score @@ -42,10 +42,10 @@ import org.apache.spark.sql.DataFrame */ @Since("1.0.0") class BinaryClassificationMetrics @Since("3.0.0") ( - @Since("1.3.0") val scoreAndLabelsWithOptWeight: RDD[_ <: Product], + @Since("1.3.0") val scoreAndLabels: RDD[_ <: Product], @Since("1.3.0") val numBins: Int = 1000) extends Logging { - val scoreLabelsWeight: RDD[(Double, (Double, Double))] = scoreAndLabelsWithOptWeight.map { + val scoreLabelsWeight: RDD[(Double, (Double, Double))] = scoreAndLabels.map { case (prediction: Double, label: Double, weight: Double) => require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") (prediction, (label, weight)) @@ -63,21 +63,19 @@ class BinaryClassificationMetrics @Since("3.0.0") ( @Since("1.0.0") def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0) - /** - * Retrieves the score and labels (for binary compatibility). - * @return The score and labels. - */ - @Since("1.3.0") - def scoreAndLabels: RDD[(Double, Double)] = { - scoreLabelsWeight.map { case (prediction, (label, _)) => (prediction, label) } - } - /** * An auxiliary constructor taking a DataFrame. * @param scoreAndLabels a DataFrame with two double columns: score and label */ private[mllib] def this(scoreAndLabels: DataFrame) = - this(scoreAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) + this(scoreAndLabels.rdd.map { + case Row(prediction: Double, label: Double, weight: Double) => + (prediction, label, weight) + case Row(prediction: Double, label: Double) => + (prediction, label, 1.0) + case other => + throw new IllegalArgumentException(s"Expected Row of tuples, got $other") + }) /** * Unpersist intermediate RDDs used in the computation. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala index ad83c24ede96..a10f26ba4640 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala @@ -22,17 +22,17 @@ import scala.collection.Map import org.apache.spark.annotation.Since import org.apache.spark.mllib.linalg.{Matrices, Matrix} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} /** * Evaluator for multiclass classification. * - * @param predAndLabelsWithOptWeight an RDD of (prediction, label, weight) or - * (prediction, label) pairs. + * @param predictionAndLabels an RDD of (prediction, label, weight) or + * (prediction, label) tuples. */ @Since("1.1.0") -class MulticlassMetrics @Since("1.1.0") (predAndLabelsWithOptWeight: RDD[_ <: Product]) { - val predLabelsWeight: RDD[(Double, Double, Double)] = predAndLabelsWithOptWeight.map { +class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product]) { + val predLabelsWeight: RDD[(Double, Double, Double)] = predictionAndLabels.map { case (prediction: Double, label: Double, weight: Double) => (prediction, label, weight) case (prediction: Double, label: Double) => @@ -46,7 +46,14 @@ class MulticlassMetrics @Since("1.1.0") (predAndLabelsWithOptWeight: RDD[_ <: Pr * @param predictionAndLabels a DataFrame with two double columns: prediction and label */ private[mllib] def this(predictionAndLabels: DataFrame) = - this(predictionAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1)))) + this(predictionAndLabels.rdd.map { + case Row(prediction: Double, label: Double, weight: Double) => + (prediction, label, weight) + case Row(prediction: Double, label: Double) => + (prediction, label, 1.0) + case other => + throw new IllegalArgumentException(s"Expected Row of tuples, got $other") + }) private lazy val labelCountByClass: Map[Double, Double] = predLabelsWeight.map { diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala index 1bd2db79573d..83b213ab51d4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala @@ -45,23 +45,23 @@ class BinaryClassificationEvaluatorSuite .setMetricName("areaUnderPR") val vectorDF = Seq( - (0d, Vectors.dense(12, 2.5)), - (1d, Vectors.dense(1, 3)), - (0d, Vectors.dense(10, 2)) + (0.0, Vectors.dense(12, 2.5)), + (1.0, Vectors.dense(1, 3)), + (0.0, Vectors.dense(10, 2)) ).toDF("label", "rawPrediction") assert(evaluator.evaluate(vectorDF) === 1.0) val doubleDF = Seq( - (0d, 0d), - (1d, 1d), - (0d, 0d) + (0.0, 0.0), + (1.0, 1.0), + (0.0, 0.0) ).toDF("label", "rawPrediction") assert(evaluator.evaluate(doubleDF) === 1.0) val stringDF = Seq( - (0d, "0d"), - (1d, "1d"), - (0d, "0d") + (0.0, "0.0"), + (1.0, "1.0"), + (0.0, "0.0") ).toDF("label", "rawPrediction") val thrown = intercept[IllegalArgumentException] { evaluator.evaluate(stringDF) @@ -77,9 +77,9 @@ class BinaryClassificationEvaluatorSuite val evaluator = new BinaryClassificationEvaluator() .setMetricName("areaUnderROC").setWeightCol(weightCol) val vectorDF = Seq( - (0d, Vectors.dense(2.5, 12), 1.0), - (1d, Vectors.dense(1, 3), 1.0), - (0d, Vectors.dense(10, 2), 1.0) + (0.0, Vectors.dense(2.5, 12), 1.0), + (1.0, Vectors.dense(1, 3), 1.0), + (0.0, Vectors.dense(10, 2), 1.0) ).toDF("label", "rawPrediction", weightCol) val result = evaluator.evaluate(vectorDF) // without weight column @@ -89,9 +89,9 @@ class BinaryClassificationEvaluatorSuite assert(result === result2) // use different weights, validate metrics change val vectorDF2 = Seq( - (0d, Vectors.dense(2.5, 12), 2.5), - (1d, Vectors.dense(1, 3), 0.1), - (0d, Vectors.dense(10, 2), 2.0) + (0.0, Vectors.dense(2.5, 12), 2.5), + (1.0, Vectors.dense(1, 3), 0.1), + (0.0, Vectors.dense(10, 2), 2.0) ).toDF("label", "rawPrediction", weightCol) val result3 = evaluator.evaluate(vectorDF2) // Since wrong result weighted more heavily, expect the score to be lower diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala index 41afbf77a3e5..06a522f43482 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -92,15 +92,15 @@ class BinaryClassificationMetricsSuite extends SparkFunSuite with MLlibTestSpark val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, 0) val thresholds = Seq(0.8, 0.6, 0.4, 0.1) val numTruePositives = - Seq(1.0 * w1, 1.0 * w1 + 2.0 * w2, 1.0 * w1 + 2.0 * w2, 3.0 * w2 + 1.0 * w1) + Seq(1 * w1, 1 * w1 + 2 * w2, 1 * w1 + 2 * w2, 3 * w2 + 1 * w1) val numFalsePositives = Seq(0.0, 1.0 * w3, 1.0 * w1 + 1.0 * w3, 1.0 * w3 + 2.0 * w1) val numPositives = 3 * w2 + 1 * w1 val numNegatives = 2 * w1 + w3 val precisions = numTruePositives.zip(numFalsePositives).map { case (t, f) => t.toDouble / (t + f) } - val recalls = numTruePositives.map(t => t / numPositives) - val fpr = numFalsePositives.map(f => f / numNegatives) + val recalls = numTruePositives.map(_ / numPositives) + val fpr = numFalsePositives.map(_ / numNegatives) val rocCurve = Seq((0.0, 0.0)) ++ fpr.zip(recalls) ++ Seq((1.0, 1.0)) val pr = recalls.zip(precisions) val prCurve = Seq((0.0, 1.0)) ++ pr diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 57c1887855fd..0f70860ceaf0 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -138,6 +138,8 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction >>> evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw", weightCol="weight") >>> evaluator.evaluate(dataset) 0.70... + >>> evaluator.evaluate(dataset, {evaluator.metricName: "areaUnderPR"}) + 0.82... .. versionadded:: 1.4.0 """ diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index ca1a3be16017..5d8d20dcfcfc 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -30,7 +30,7 @@ class BinaryClassificationMetrics(JavaModelWrapper): """ Evaluator for binary classification. - :param scoreAndLabelsWithOptWeight: an RDD of score, label and optional weight. + :param scoreAndLabels: an RDD of score, label and optional weight. >>> scoreAndLabels = sc.parallelize([ ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2) @@ -45,23 +45,23 @@ class BinaryClassificationMetrics(JavaModelWrapper): ... (0.6, 1.0, 0.5), (0.8, 1.0, 0.7)], 2) >>> metrics = BinaryClassificationMetrics(scoreAndLabelsWithOptWeight) >>> metrics.areaUnderROC - 0.70... + 0.79... >>> metrics.areaUnderPR - 0.83... + 0.88... .. versionadded:: 1.4.0 """ - def __init__(self, scoreAndLabelsWithOptWeight): - sc = scoreAndLabelsWithOptWeight.ctx + def __init__(self, scoreAndLabels): + sc = scoreAndLabels.ctx sql_ctx = SQLContext.getOrCreate(sc) - numCol = len(scoreAndLabelsWithOptWeight.first()) + numCol = len(scoreAndLabels.first()) schema = StructType([ StructField("score", DoubleType(), nullable=False), StructField("label", DoubleType(), nullable=False)]) - if (numCol == 3): + if numCol == 3: schema.add("weight", DoubleType(), False) - df = sql_ctx.createDataFrame(scoreAndLabelsWithOptWeight, schema=schema) + df = sql_ctx.createDataFrame(scoreAndLabels, schema=schema) java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics java_model = java_class(df._jdf) super(BinaryClassificationMetrics, self).__init__(java_model) @@ -174,7 +174,7 @@ class MulticlassMetrics(JavaModelWrapper): """ Evaluator for multiclass classification. - :param predAndLabelsWithOptWeight: an RDD of prediction, label and optional weight. + :param predictionAndLabels: an RDD of prediction, label and optional weight. >>> predictionAndLabels = sc.parallelize([(0.0, 0.0), (0.0, 1.0), (0.0, 0.0), ... (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)]) @@ -235,16 +235,16 @@ class MulticlassMetrics(JavaModelWrapper): .. versionadded:: 1.4.0 """ - def __init__(self, predAndLabelsWithOptWeight): - sc = predAndLabelsWithOptWeight.ctx + def __init__(self, predictionAndLabels): + sc = predictionAndLabels.ctx sql_ctx = SQLContext.getOrCreate(sc) - numCol = len(predAndLabelsWithOptWeight.first()) + numCol = len(predictionAndLabels.first()) schema = StructType([ StructField("prediction", DoubleType(), nullable=False), StructField("label", DoubleType(), nullable=False)]) - if (numCol == 3): + if numCol == 3: schema.add("weight", DoubleType(), False) - df = sql_ctx.createDataFrame(predAndLabelsWithOptWeight, schema) + df = sql_ctx.createDataFrame(predictionAndLabels, schema) java_class = sc._jvm.org.apache.spark.mllib.evaluation.MulticlassMetrics java_model = java_class(df._jdf) super(MulticlassMetrics, self).__init__(java_model)