Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import java.io.Serializable
Expand All @@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.math3.util.Precision
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
}

/**
* Aggregates points of duplicate feature values into a single point using as label the weighted
* average of the labels of the points with duplicate feature values. All points for a unique
* feature values are aggregated as:
*
* - Aggregated label is the weighted average of all labels
* - Aggregated feature is the weighted average of all equal features[1]
* - Aggregated weight is the sum of all weights
*
* [1] Note: It is possible that feature values to be equal up to a resolution due to
* representation errors, since we cannot know which feature value to use in that case, we
* compute the weighted average of the features. Ideally, all feature values will be equal and
* the weighted average is just the value at any point.
*
* @param input
* Input data of tuples (label, feature, weight). Weights must be non-negative.
* @return
* Points with unique feature values.
*/
private[regression] def makeUnique(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val cleanInput = input.filter { case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative")
weight > 0
}

if (cleanInput.length <= 1) {
cleanInput
} else {
// whether or not two double features are equal up to a precision
@inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b)

val pointsAccumulator = new IsotonicRegression.PointsAccumulator
var (_, prevFeature, _) = cleanInput.head

// Go through input points, merging all points with approximately equal feature values into
// a single point. Equality of features is defined by areEqual method. The label of the
// accumulated points is the weighted average of the labels of all points of equal feature
// value. It is possible that feature values to be equal up to a resolution due to
// representation errors, since we cannot know which feature value to use in that case,
// we compute the weighted average of the features.
cleanInput.foreach { case point @ (_, feature, _) =>
if (areEqual(feature, prevFeature)) {
pointsAccumulator += point
} else {
pointsAccumulator.appendToOutput()
pointsAccumulator := point
}
prevFeature = feature
}
// Append the last accumulated point
pointsAccumulator.appendToOutput()
pointsAccumulator.getOutput
}
}

/**
* Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
* described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
Expand All @@ -322,35 +381,27 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
* 658-672.
*
* @param input Input data of tuples (label, feature, weight). Weights must
be non-negative.
* @param cleanUniqueInput Input data of tuples(label, feature, weight).Features must be unique
* and weights must be non-negative.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val cleanInput = input.filter{ case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
)
weight > 0
}

if (cleanInput.isEmpty) {
if (cleanUniqueInput.isEmpty) {
return Array.empty
}

// Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
// cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
// Initially, each data point is its own block.
val blockBounds = Array.range(0, cleanInput.length)
val blockBounds = Array.range(0, cleanUniqueInput.length)

// Keep track of the sum of weights and sum of weight * y for each block. weights(start)
// gives the values for the block. Entries that are not at the start of a block
// are meaningless.
val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, weight) =>
(weight, weight * y)
}

Expand Down Expand Up @@ -392,10 +443,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
// Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
// want to compress our output as much as possible. Both give correct results.
var i = 0
while (nextBlock(i) < cleanInput.length) {
while (nextBlock(i) < cleanUniqueInput.length) {
if (average(i) >= average(nextBlock(i))) {
merge(i, nextBlock(i))
while((i > 0) && (average(prevBlock(i)) >= average(i))) {
while ((i > 0) && (average(prevBlock(i)) >= average(i))) {
i = merge(prevBlock(i), i)
}
} else {
Expand All @@ -406,15 +457,15 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
// construct the output by walking through the blocks in order
val output = ArrayBuffer.empty[(Double, Double, Double)]
i = 0
while (i < cleanInput.length) {
while (i < cleanUniqueInput.length) {
// If block size is > 1, a point at the start and end of the block,
// each receiving half the weight. Otherwise, a single point with
// all the weight.
if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) {
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanUniqueInput(blockEnd(i))._2, weights(i)._1 / 2))
} else {
output += ((average(i), cleanInput(i)._2, weights(i)._1))
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1))
}
i = nextBlock(i)
}
Expand All @@ -434,12 +485,56 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
val keyedInput = input.keyBy(_._2)
val parallelStepResult = keyedInput
// Points with same or adjacent features must collocate within the same partition.
.partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
.values
// Lexicographically sort points by features then labels.
.mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
// Aggregate points with equal features into a single point.
.map(makeUnique)
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
// Sort again because collect() doesn't promise ordering.
.sortBy(x => (x._2, x._1))
Comment on lines 492 to +498
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now it is redundant to sort by labels since we already are making features unique.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can open up a follow-up PR on the same JIRA if there are minor additional improvements or docs to suggest

poolAdjacentViolators(parallelStepResult)
}
}

object IsotonicRegression {
/**
* Utility class, holds a buffer of all points with unique features so far, and performs
* weighted sum accumulation of points. Hides these details for better readability of the
* main algorithm.
*/
class PointsAccumulator {
private val output = ArrayBuffer[(Double, Double, Double)]()
private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) =
(0d, 0d, 0d)

/** Resets the current value of the point accumulator using the provided point. */
def :=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
currentLabel = label * weight
currentFeature = feature * weight
currentWeight = weight
}

/** Accumulates the provided point into the current value of the point accumulator. */
def +=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
currentLabel += label * weight
currentFeature += feature * weight
currentWeight += weight
}

/** Appends the current value of the point accumulator to the output. */
def appendToOutput(): Unit =
output += ((
currentLabel / currentWeight,
currentFeature / currentWeight,
currentWeight))

/** Returns all accumulated points so far. */
def getOutput: Array[(Double, Double, Double)] = output.toArray
}
}
Loading