Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.HuberRobustRegressionWithLBFGS
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater}

/**
* An example app for Huber Robust regression. Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression
* }}}
* A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object HuberRobustRegression extends App {

object RegType extends Enumeration {
type RegType = Value
val NONE, L1, L2 = Value
}

import RegType._

case class Params(
input: String = null,
numIterations: Int = 100,
stepSize: Double = 1.0,
regType: RegType = L2,
regParam: Double = 0.1)

val defaultParams = Params()

val parser = new OptionParser[Params]("HuberRobustRegression") {
head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.")
opt[Int]("numIterations")
.text("number of iterations")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("stepSize")
.text(s"initial step size, default: ${defaultParams.stepSize}")
.action((x, c) => c.copy(stepSize = x))
opt[String]("regType")
.text(s"regularization type (${RegType.values.mkString(",")}), " +
s"default: ${defaultParams.regType}")
.action((x, c) => c.copy(regType = RegType.withName(x)))
opt[Double]("regParam")
.text(s"regularization parameter, default: ${defaultParams.regParam}")
arg[String]("<input>")
.required()
.text("input paths to labeled examples in LIBSVM format")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \
| examples/target/scala-*/spark-examples-*.jar \
| data/mllib/sample_linear_regression_data.txt
""".stripMargin)
}

parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
sys.exit(1)
}

// If no parameter set SimpleUpdater will be used.
def run(params: Params) {
val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params")
val sc = new SparkContext(conf)

Logger.getRootLogger.setLevel(Level.WARN)

val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()

val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = splits(1).cache()

val numTraining = training.count()
val numTest = test.count()
println(s"Training: $numTraining, test: $numTest.")

examples.unpersist(blocking = false)

val updater = params.regType match {
case NONE => new SimpleUpdater()
case L1 => new L1Updater()
case L2 => new SquaredL2Updater()
}

val algorithm = new HuberRobustRegressionWithLBFGS()
algorithm.optimizer
.setNumIterations(params.numIterations)
.setUpdater(updater)
.setRegParam(params.regParam)

val model = algorithm.run(training)

val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))

val loss = predictionAndLabel.map { case (p, l) =>
val err = p - l
err * err
}.reduce(_ + _)
val rmse = math.sqrt(loss / numTest)

println(s"Test RMSE = $rmse.")

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal}
import scala.math.pow

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -118,6 +119,69 @@ class LeastSquaresGradient extends Gradient {
}
}

/**
* :: DeveloperApi ::
* Compute gradient and loss for Huber objective function, as used in robust regression.
* The Huber M-estimator corresponds to a probability distribution for the errors which is normal
* in the centre but like a double exponential distribution in the tails (Hogg 1979: 109).
* L = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
* substantial resistance to outliers otherwise.
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class HuberRobustGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val diff = dot(data, weights) - label
val loss = diff * diff
val gradient = data.copy
val k = 1.345

/**
* Tuning constant is generally picked to give reasonably high efficiency in the normal case.
* Smaller values produce more resistance to outliers while at the expense of
* lower efficiency when the errors are normally distributed.
*/

if(diff < -k){
scal(-k, gradient)
(gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2)))
}else if(diff >= -k && diff <= k){
scal(diff, gradient)
(gradient, (1.0 / 2.0 * loss))
}else {
scal(k, gradient)
(gradient, (k * diff - 1.0 / 2.0 * pow(k, 2)))
}
}

override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val diff = dot(data, weights) - label
val loss = diff * diff
val k = 1.345

if(diff < -k){
axpy(-k, data, cumGradient)
}else if(diff >= -k && diff <= k){
axpy(diff, data, cumGradient)
}else {
axpy(k, data, cumGradient)
}
if(diff < -k){
-k * diff - 1.0 / 2.0 * pow(k, 2)
}else if(diff >= -k && diff <= k){
1.0 / 2.0 * loss
}else {
k * diff - 1.0 /2.0 * pow(k, 2)
}
}
}

/**
* :: DeveloperApi ::
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.annotation.Experimental

/**
* Regression model trained using Huber M-Estimation RobustRegression.
*
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
class HuberRobustRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}

/**
* Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent.
* This solves the Huber objective function
* f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k
* f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k
* where k = 1.345 which produce 95% efficiency when the errors are normal and
* substantial resistance to outliers otherwise.
* Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class HuberRobustRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable {

private val gradient = new HuberRobustGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction)

/**
* Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0,
* numIterations: 100, miniBatchFraction: 1.0}.
*/
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
new HuberRobustRegressionModel(weights, intercept)
}
}

/**
* Top-level methods for calling HuberRobustRegression.
*/
object HuberRobustRegressionWithSGD {

/**
* Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
* in gradient descent are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector): HuberRobustRegressionModel = {
new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input, initialWeights)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): HuberRobustRegressionModel = {
new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. We use the entire
* data set to compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param stepSize Step size to be used for each iteration of Gradient Descent.
* @param numIterations Number of iterations of gradient descent to run.
* @return a HuberRobustRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double): HuberRobustRegressionModel = {
train(input, numIterations, stepSize, 1.0)
}

/**
* Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using a step size of 1.0. We use the entire data
* set to compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @return a HuberRobustRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int): HuberRobustRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}
}

/**
* Train a classification model for HuberRobust Regression using Limited-memory BFGS.
* Standard feature scaling and L2 regularization are used by default.
*/
class HuberRobustRegressionWithLBFGS
extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable {

this.setFeatureScaling(true)

override val optimizer = new LBFGS(new HuberRobustGradient, new SquaredL2Updater)

override protected def createModel(weights: Vector, intercept: Double) = {
new HuberRobustRegressionModel(weights, intercept)
}
}
Loading