From eb61b36bdd022872cab63a1e85e7c9685f507359 Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Wed, 7 Dec 2022 13:40:12 +0200 Subject: [PATCH 1/2] [SPARK-41008][MLLIB] Dedup isotonic regression duplicate features --- .../mllib/regression/IsotonicRegression.scala | 136 ++++++++++--- .../regression/IsotonicRegressionSuite.scala | 180 ++++++++++++++---- 2 files changed, 257 insertions(+), 59 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 649f9816e6a5a..09054f9b7b0e4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.mllib.regression import java.io.Serializable @@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.commons.math3.util.Precision import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -307,6 +307,99 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]]) } + /** + * Aggregates points of duplicate feature values into a single point using as label the weighted + * average of the labels of the points with duplicate feature values. All points for a unique + * feature values are aggregated as: + * + * - Aggregated label is the weighted average of all labels + * - Aggregated feature is the weighted average of all equal features[1] + * - Aggregated weight is the sum of all weights + * + * [1] Note: It is possible that feature values to be equal up to a resolution due to + * representation errors, since we cannot know which feature value to use in that case, we + * compute the weighted average of the features. Ideally, all feature values will be equal and + * the weighted average is just the value at any point. + * + * @param input + * Input data of tuples (label, feature, weight). Weights must be non-negative. + * @return + * Points with unique feature values. + */ + private[regression] def makeUnique( + input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + + val cleanInput = input.filter { case (y, x, weight) => + require( + weight >= 0.0, + s"Negative weight at point ($y, $x, $weight). Weights must be non-negative") + weight > 0 + } + + if (cleanInput.isEmpty) Array.empty + else if (cleanInput.length <= 1) cleanInput + else { + // whether or not two double features are equal up to a precision + @inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b) + + // Utility object, holds a buffer of all points with unique features so far, and performs + // weighted sum accumulation of points. Hides these details for better readability of the + // main algorithm. + object PointsAccumulator { + private val output = ArrayBuffer[(Double, Double, Double)]() + private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = + (0d, 0d, 0d) + + /** Resets the current value of the point accumulator using the provided point. */ + def :=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel = label * weight + currentFeature = feature * weight + currentWeight = weight + } + + /** Accumulates the provided point into the current value of the point accumulator. */ + def +=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel += label * weight + currentFeature += feature * weight + currentWeight += weight + } + + /** Appends the current value of the point accumulator to the output. */ + def appendToOutput(): Unit = + output += (( + currentLabel / currentWeight, + currentFeature / currentWeight, + currentWeight)) + + /** Returns all accumulated points so far. */ + def getOutput: Array[(Double, Double, Double)] = output.toArray + } + + var (_, prevFeature, _) = cleanInput.head + + // Go through input points, merging all points with approximately equal feature values into + // a single point. Equality of features is defined by areEqual method. The label of the + // accumulated points is the weighted average of the labels of all points of equal feature + // value. It is possible that feature values to be equal up to a resolution due to + // representation errors, since we cannot know which feature value to use in that case, + // we compute the weighted average of the features. + cleanInput.foreach { case point @ (_, feature, _) => + if (areEqual(feature, prevFeature)) { + PointsAccumulator += point + } else { + PointsAccumulator.appendToOutput() + PointsAccumulator := point + } + prevFeature = feature + } + // Append the last accumulated point + PointsAccumulator.appendToOutput() + PointsAccumulator.getOutput + } + } + /** * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start @@ -322,35 +415,27 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000): * 658-672. * - * @param input Input data of tuples (label, feature, weight). Weights must - be non-negative. + * @param cleanUniqueInput Input data of tuples(label, feature, weight).Features must be unique + * and weights must be non-negative. * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( - input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - - val cleanInput = input.filter{ case (y, x, weight) => - require( - weight >= 0.0, - s"Negative weight at point ($y, $x, $weight). Weights must be non-negative" - ) - weight > 0 - } + cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - if (cleanInput.isEmpty) { + if (cleanUniqueInput.isEmpty) { return Array.empty } // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i // Initially, each data point is its own block. - val blockBounds = Array.range(0, cleanInput.length) + val blockBounds = Array.range(0, cleanUniqueInput.length) // Keep track of the sum of weights and sum of weight * y for each block. weights(start) // gives the values for the block. Entries that are not at the start of a block // are meaningless. - val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) => + val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, weight) => (weight, weight * y) } @@ -392,10 +477,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we // want to compress our output as much as possible. Both give correct results. var i = 0 - while (nextBlock(i) < cleanInput.length) { + while (nextBlock(i) < cleanUniqueInput.length) { if (average(i) >= average(nextBlock(i))) { merge(i, nextBlock(i)) - while((i > 0) && (average(prevBlock(i)) >= average(i))) { + while ((i > 0) && (average(prevBlock(i)) >= average(i))) { i = merge(prevBlock(i), i) } } else { @@ -406,15 +491,15 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // construct the output by walking through the blocks in order val output = ArrayBuffer.empty[(Double, Double, Double)] i = 0 - while (i < cleanInput.length) { + while (i < cleanUniqueInput.length) { // If block size is > 1, a point at the start and end of the block, // each receiving half the weight. Otherwise, a single point with // all the weight. - if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) { - output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2)) - output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2)) + if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) { + output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2)) + output += ((average(i), cleanUniqueInput(blockEnd(i))._2, weights(i)._1 / 2)) } else { - output += ((average(i), cleanInput(i)._2, weights(i)._1)) + output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1)) } i = nextBlock(i) } @@ -434,12 +519,17 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { val keyedInput = input.keyBy(_._2) val parallelStepResult = keyedInput + // Points with same or adjacent features must collocate within the same partition. .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) .values + // Lexicographically sort points by features then labels. .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) + // Aggregate points with equal features into a single point. + .map(makeUnique) .flatMap(poolAdjacentViolators) .collect() - .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. + // Sort again because collect() doesn't promise ordering. + .sortBy(x => (x._2, x._1)) poolAdjacentViolators(parallelStepResult) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 8066900dfa011..e7408e88064b9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.regression +import org.apache.commons.math3.util.Precision import org.scalatest.matchers.must.Matchers import org.apache.spark.{SparkException, SparkFunSuite} @@ -24,6 +25,24 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils +/** + * Tests can be verified through the following python snippet: + * + * {{{ + * from sklearn.isotonic import IsotonicRegression + * + * def test(x, y, x_test, isotonic=True): + * ir = IsotonicRegression(out_of_bounds='clip', increasing=isotonic).fit(x, y) + * y_test = ir.predict(x_test) + * + * def print_array(label, a): + * print(f"{label}: [{', '.join([str(i) for i in a])}]") + * + * print_array("boundaries", ir.X_thresholds_) + * print_array("predictions", ir.y_thresholds_) + * print_array("y_test", y_test) + * }}} + */ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers { private def round(d: Double) = { @@ -44,8 +63,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w labels: Seq[Double], weights: Seq[Double], isotonic: Boolean): IsotonicRegressionModel = { - val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache() - new IsotonicRegression().setIsotonic(isotonic).run(trainRDD) + runIsotonicRegressionOnInput(generateIsotonicInput(labels, weights), isotonic) } private def runIsotonicRegression( @@ -54,17 +72,37 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic) } + private def runIsotonicRegression( + labels: Seq[Double], + features: Seq[Double], + weights: Seq[Double], + isotonic: Boolean): IsotonicRegressionModel = { + runIsotonicRegressionOnInput( + labels.indices.map(i => (labels(i), features(i), weights(i))), + isotonic) + } + + private def runIsotonicRegressionOnInput( + input: Seq[(Double, Double, Double)], + isotonic: Boolean, + slices: Int = sc.defaultParallelism): IsotonicRegressionModel = { + val trainRDD = sc.parallelize(input, slices).cache() + new IsotonicRegression().setIsotonic(isotonic).run(trainRDD) + } + test("increasing isotonic regression") { /* The following result could be re-produced with sklearn. - > from sklearn.isotonic import IsotonicRegression - > x = range(9) - > y = [1, 2, 3, 1, 6, 17, 16, 17, 18] - > ir = IsotonicRegression(x, y) - > print ir.predict(x) + > test( + > x = range(9), + > y = [1, 2, 3, 1, 6, 17, 16, 17, 18], + > x_test = range(9) + > ) - array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ]) + boundaries: [0.0, 1.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] + predictions: [1.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0] + y_test: [1.0, 2.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0] */ val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) @@ -142,9 +180,9 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache() + val model = + runIsotonicRegressionOnInput(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, true, 2) - val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) } @@ -159,7 +197,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) assert(model.boundaries === Array(0, 1, 2, 4)) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2)) + assert(model.predictions.map(round) === Array(1, 2, 3.3 / 1.2, 3.3 / 1.2)) } test("weighted isotonic regression with negative weights") { @@ -176,11 +214,20 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w } test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") { - val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1), - (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), - 2) + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + true, + 2) + + assert(model.boundaries === Array(1.0, 3.0)) + assert(model.predictions === Array(0.75, 0.75)) + } - val model = new IsotonicRegression().run(trainRDD) + test("SPARK-41008 isotonic regression with duplicate features differs from sklearn") { + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + true, + 2) assert(model.boundaries === Array(1.0, 3.0)) assert(model.predictions === Array(0.75, 0.75)) @@ -194,32 +241,38 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w assert(model.predict(0.5) === 1.5) assert(model.predict(0.75) === 1.75) assert(model.predict(1) === 2) - assert(model.predict(2) === 10d/3) - assert(model.predict(9) === 10d/3) + assert(model.predict(2) === 10d / 3) + assert(model.predict(9) === 10d / 3) } test("isotonic regression prediction with duplicate features") { - val trainRDD = sc.parallelize( - Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache() - val model = new IsotonicRegression().run(trainRDD) - - assert(model.predict(0) === 1) - assert(model.predict(1.5) === 2) - assert(model.predict(2.5) === 4.5) - assert(model.predict(4) === 6) + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), + true, + 2) + + assert(model.boundaries === Array(1.0, 2.0, 3.0)) + assert(model.predictions === Array(1.5, 3.0, 5.5)) + + assert(model.predict(0) === 1.5) + assert(model.predict(1.5) === 2.25) + assert(model.predict(2.5) === 4.25) + assert(model.predict(4) === 5.5) } test("antitonic regression prediction with duplicate features") { - val trainRDD = sc.parallelize( - Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache() - val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) - - assert(model.predict(0) === 6) - assert(model.predict(1.5) === 4.5) - assert(model.predict(2.5) === 2) - assert(model.predict(4) === 1) + val model = runIsotonicRegressionOnInput( + Seq((5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), + false, + 2) + + assert(model.boundaries === Array(1.0, 2.0, 3.0)) + assert(model.predictions === Array(5.5, 3.0, 1.5)) + + assert(model.predict(0) === 5.5) + assert(model.predict(1.5) === 4.25) + assert(model.predict(2.5) === 2.25) + assert(model.predict(4) === 1.5) } test("isotonic regression RDD prediction") { @@ -227,7 +280,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache() val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) - assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) + assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0 / 3, 10.0 / 3)) } test("antitonic regression prediction") { @@ -270,4 +323,59 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false) } } + + test("makeUnique: handle duplicate features") { + val regressor = new IsotonicRegression() + import regressor.makeUnique + import Precision.EPSILON + + // Note: input must be lexicographically sorted by (feature, label) + + // empty + assert(makeUnique(Array.empty) === Array.empty) + + // single + assert(makeUnique(Array((1.0, 1.0, 1.0))) === Array((1.0, 1.0, 1.0))) + + // two and duplicate + assert(makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0, 1.0))) === Array((1.0, 1.0, 2.0))) + + // two and unique + assert( + makeUnique(Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0))) === + Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0))) + + // generic with duplicates + assert( + makeUnique( + Array( + (10.0, 1.0, 1.0), (20.0, 1.0, 1.0), + (10.0, 2.0, 1.0), (20.0, 2.0, 1.0), (30.0, 2.0, 1.0), + (10.0, 3.0, 1.0) + )) === Array((15.0, 1.0, 2.0), (20.0, 2.0, 3.0), (10.0, 3.0, 1.0))) + + // generic unique + assert( + makeUnique(Array((10.0, 1.0, 1.0), (10.0, 2.0, 1.0), (10.0, 3.0, 1.0))) === Array( + (10.0, 1.0, 1.0), + (10.0, 2.0, 1.0), + (10.0, 3.0, 1.0))) + + // generic with duplicates and non-uniform weights + assert( + makeUnique( + Array( + (10.0, 1.0, 0.3), (20.0, 1.0, 0.7), + (10.0, 2.0, 0.3), (20.0, 2.0, 0.3), (30.0, 2.0, 0.4), + (10.0, 3.0, 1.0) + )) === Array( + (10.0 * 0.3 + 20.0 * 0.7, 1.0, 1.0), + (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0), + (10.0, 3.0, 1.0))) + + // duplicate up to resolution error + assert( + makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + EPSILON, 1.0))) === + Array((1.0, 1.0, 3.0))) + } } From 107928dafafe12e0cfb80197fe314355c539903f Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Wed, 7 Dec 2022 18:19:51 +0200 Subject: [PATCH 2/2] Apply code review comments on PR 38966 https://github.com/apache/spark/pull/38966 --- .../mllib/regression/IsotonicRegression.scala | 91 ++++++++++--------- .../regression/IsotonicRegressionSuite.scala | 4 +- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 09054f9b7b0e4..0b2bf14750168 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -336,47 +336,13 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali weight > 0 } - if (cleanInput.isEmpty) Array.empty - else if (cleanInput.length <= 1) cleanInput - else { + if (cleanInput.length <= 1) { + cleanInput + } else { // whether or not two double features are equal up to a precision @inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b) - // Utility object, holds a buffer of all points with unique features so far, and performs - // weighted sum accumulation of points. Hides these details for better readability of the - // main algorithm. - object PointsAccumulator { - private val output = ArrayBuffer[(Double, Double, Double)]() - private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = - (0d, 0d, 0d) - - /** Resets the current value of the point accumulator using the provided point. */ - def :=(point: (Double, Double, Double)): Unit = { - val (label, feature, weight) = point - currentLabel = label * weight - currentFeature = feature * weight - currentWeight = weight - } - - /** Accumulates the provided point into the current value of the point accumulator. */ - def +=(point: (Double, Double, Double)): Unit = { - val (label, feature, weight) = point - currentLabel += label * weight - currentFeature += feature * weight - currentWeight += weight - } - - /** Appends the current value of the point accumulator to the output. */ - def appendToOutput(): Unit = - output += (( - currentLabel / currentWeight, - currentFeature / currentWeight, - currentWeight)) - - /** Returns all accumulated points so far. */ - def getOutput: Array[(Double, Double, Double)] = output.toArray - } - + val pointsAccumulator = new IsotonicRegression.PointsAccumulator var (_, prevFeature, _) = cleanInput.head // Go through input points, merging all points with approximately equal feature values into @@ -387,16 +353,16 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // we compute the weighted average of the features. cleanInput.foreach { case point @ (_, feature, _) => if (areEqual(feature, prevFeature)) { - PointsAccumulator += point + pointsAccumulator += point } else { - PointsAccumulator.appendToOutput() - PointsAccumulator := point + pointsAccumulator.appendToOutput() + pointsAccumulator := point } prevFeature = feature } // Append the last accumulated point - PointsAccumulator.appendToOutput() - PointsAccumulator.getOutput + pointsAccumulator.appendToOutput() + pointsAccumulator.getOutput } } @@ -533,3 +499,42 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali poolAdjacentViolators(parallelStepResult) } } + +object IsotonicRegression { + /** + * Utility class, holds a buffer of all points with unique features so far, and performs + * weighted sum accumulation of points. Hides these details for better readability of the + * main algorithm. + */ + class PointsAccumulator { + private val output = ArrayBuffer[(Double, Double, Double)]() + private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = + (0d, 0d, 0d) + + /** Resets the current value of the point accumulator using the provided point. */ + def :=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel = label * weight + currentFeature = feature * weight + currentWeight = weight + } + + /** Accumulates the provided point into the current value of the point accumulator. */ + def +=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel += label * weight + currentFeature += feature * weight + currentWeight += weight + } + + /** Appends the current value of the point accumulator to the output. */ + def appendToOutput(): Unit = + output += (( + currentLabel / currentWeight, + currentFeature / currentWeight, + currentWeight)) + + /** Returns all accumulated points so far. */ + def getOutput: Array[(Double, Double, Double)] = output.toArray + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index e7408e88064b9..b59d16be6cd0a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -241,8 +241,8 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w assert(model.predict(0.5) === 1.5) assert(model.predict(0.75) === 1.75) assert(model.predict(1) === 2) - assert(model.predict(2) === 10d / 3) - assert(model.predict(9) === 10d / 3) + assert(model.predict(2) === 10.0 / 3) + assert(model.predict(9) === 10.0 / 3) } test("isotonic regression prediction with duplicate features") {