From f0571890522eb09d092f01e8b5a208d4849e56df Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Thu, 21 Aug 2014 15:34:15 -0700 Subject: [PATCH 1/8] add example HuberRobustRegression.scala --- .../mllib/HuberRobustRegression.scala | 134 +++++++++++++++ .../spark/mllib/optimization/Gradient.scala | 58 +++++++ .../mllib/regression/RobustRegression.scala | 159 ++++++++++++++++++ .../regression/RobustRegressionSuite.scala | 127 ++++++++++++++ 4 files changed, 478 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala new file mode 100644 index 0000000000000..6d7c4520202fc --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.regression.HuberRobustRegressionWithSGD +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} + +/** + * An example app for Huber Robust regression. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression + * }}} + * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object HuberRobustRegression extends App { + + object RegType extends Enumeration { + type RegType = Value + val NONE, L1, L2 = Value + } + + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) + + val defaultParams = Params() + + val parser = new OptionParser[Params]("HuberRobustRegression") { + head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \ + | examples/target/scala-*/spark-examples-*.jar \ + | data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case NONE => new SimpleUpdater() + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val algorithm = new HuberRobustRegressionWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + + val model = algorithm.run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val loss = predictionAndLabel.map { case (p, l) => + val err = p - l + err * err + }.reduce(_ + _) + val rmse = math.sqrt(loss / numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index fdd67160114ca..17af6c77b0cda 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} +import scala.math.pow /** * :: DeveloperApi :: @@ -118,6 +119,63 @@ class LeastSquaresGradient extends Gradient { } } +/** + * :: DeveloperApi :: + * Compute gradient and loss for a M-estimation Huber objective function, as used in robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal in the centre but like + * a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class HuberRobustGradient extends Gradient { + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val diff = dot(data, weights) - label + val loss = diff * diff + val gradient = data.copy + val k = 1.345 + //Tuning constant is generally picked to give reasonably high efficiency in the normal case. + //Smaller values produce more resistance to outliers while at the expense of lower efficiency when the errors are normally distributed. + if(diff < -k){ + scal(-k, gradient) + (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) + }else if(diff >= -k && diff <= k){ + scal(diff, gradient) + (gradient, (1.0 / 2.0 * pow(diff, 2))) + }else { + scal(k, gradient) + (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2))) + } + } + + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { + val diff = dot(data, weights) - label + val k = 1.345 + // + if(diff < -k){ + axpy(-k, data, cumGradient) + }else if(diff >= -k && diff <= k){ + axpy(diff, data, cumGradient) + }else { + axpy(k, data, cumGradient) + } + + if(diff < -k){ + -k * diff - 1.0 / 2.0 * pow(k, 2) + }else if(diff >= -k && diff <= k){ + 1.0 / 2.0 * pow(diff, 2) + }else { + k * diff - 1.0 /2.0 * pow(k, 2) + } + } +} + /** * :: DeveloperApi :: * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala new file mode 100644 index 0000000000000..1b7039e444f43 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.optimization._ +import org.apache.spark.annotation.Experimental + +/** + * Regression model trained using Huber M-Estimation RobustRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class HuberRobustRegressionModel ( + override val weights: Vector, + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { + + override protected def predictPoint( + dataMatrix: Vector, + weightMatrix: Vector, + intercept: Double): Double = { + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept + } +} + +/** + * Train a Huber M-Estimation Robust regression model with no regularization using Stochastic Gradient Descent. + * This solves the Huber objective function + * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + * its corresponding right hand side label y. + * See also the documentation for the precise formulation. + */ +class HuberRobustRegressionWithSGD private[mllib] ( + private var stepSize: Double, + private var numIterations: Int, + private var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable { + + private val gradient = new HuberRobustGradient() + private val updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0, + * numIterations: 100, miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new HuberRobustRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling HuberRobustRegression. + */ +object HuberRobustRegressionWithSGD { + + /** + * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used + * in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Vector): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + .run(input, initialWeights) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double): HuberRobustRegressionModel = { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): HuberRobustRegressionModel = { + train(input, numIterations, 1.0, 1.0) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala new file mode 100644 index 0000000000000..ca354421cd72f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator, + LocalSparkContext} + +class RobustRegressionSuite extends FunSuite with LocalSparkContext { + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("Huber Robust regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(true) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X2 + test("Huber Robust regression without intercept") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X10000 + test("sparse Huber Robust regression without intercept") { + val denseRDD = sc.parallelize( + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2) + val sparseRDD = denseRDD.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + }.cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(sparseRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 10000) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(9999) >= 9.0 && weights(9999) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) + val sparseValidationData = validationData.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + } + val sparseValidationRDD = sc.parallelize(sparseValidationData, 2) + + // Test prediction on RDD. + validatePrediction( + model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData) + + // Test prediction on Array. + validatePrediction( + sparseValidationData.map(row => model.predict(row.features)), sparseValidationData) + } +} From 38c1b3c0f7dd2a6f62eae2fecb006495ffc8a064 Mon Sep 17 00:00:00 2001 From: fanjiang Date: Fri, 22 Aug 2014 02:49:41 -0700 Subject: [PATCH 2/8] add new class HuberRobustGradient and RobustRegression.scala --- .../spark/mllib/optimization/Gradient.scala | 18 ++++++++----- .../mllib/regression/RobustRegression.scala | 25 ++++++++++--------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 17af6c77b0cda..7834e683193c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -121,12 +121,13 @@ class LeastSquaresGradient extends Gradient { /** * :: DeveloperApi :: - * Compute gradient and loss for a M-estimation Huber objective function, as used in robust regression. - * The Huber M-estimator corresponds to a probability distribution for the errors which is normal in the centre but like - * a double exponential distribution in the tails (Hogg 1979: 109). + * Compute gradient and loss for Huber objective function, as used in robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k - * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. * See also the documentation for the precise formulation. */ @DeveloperApi @@ -136,8 +137,13 @@ class HuberRobustGradient extends Gradient { val loss = diff * diff val gradient = data.copy val k = 1.345 - //Tuning constant is generally picked to give reasonably high efficiency in the normal case. - //Smaller values produce more resistance to outliers while at the expense of lower efficiency when the errors are normally distributed. + + /** + * Tuning constant is generally picked to give reasonably high efficiency in the normal case. + * Smaller values produce more resistance to outliers while at the expense of + * lower efficiency when the errors are normally distributed. + */ + if(diff < -k){ scal(-k, gradient) (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index 1b7039e444f43..4e50ccb1c8dc3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -42,11 +42,12 @@ class HuberRobustRegressionModel ( } /** - * Train a Huber M-Estimation Robust regression model with no regularization using Stochastic Gradient Descent. + * Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent. * This solves the Huber objective function * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k - * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with * its corresponding right hand side label y. * See also the documentation for the precise formulation. @@ -81,8 +82,8 @@ class HuberRobustRegressionWithSGD private[mllib] ( object HuberRobustRegressionWithSGD { /** - * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses + * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used * in gradient descent are initialized using the initial weights provided. * @@ -105,8 +106,8 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data @@ -124,9 +125,9 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. We use the entire data set to - * compute the true gradient in each iteration. + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. We use the entire + * data set to compute the true gradient in each iteration. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data * matrix A as well as the corresponding right hand side label y @@ -142,9 +143,9 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using a step size of 1.0. We use the entire data set to - * compute the true gradient in each iteration. + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using a step size of 1.0. We use the entire data + * set to compute the true gradient in each iteration. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data * matrix A as well as the corresponding right hand side label y From 6f58e8082b1f495134cbaaff104c91b6771fbedf Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Wed, 3 Sep 2014 22:11:04 -0700 Subject: [PATCH 3/8] add HuberRobustRegressionWithLBFGS and an example --- .../examples/mllib/HuberRobustRegression.scala | 5 ++--- .../mllib/regression/RobustRegression.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala index 6d7c4520202fc..192bcf62091e7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -21,7 +21,7 @@ import org.apache.log4j.{Level, Logger} import scopt.OptionParser import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.regression.HuberRobustRegressionWithSGD +import org.apache.spark.mllib.regression.HuberRobustRegressionWithLBFGS import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} @@ -109,10 +109,9 @@ object HuberRobustRegression extends App { case L2 => new SquaredL2Updater() } - val algorithm = new HuberRobustRegressionWithSGD() + val algorithm = new HuberRobustRegressionWithLBFGS() algorithm.optimizer .setNumIterations(params.numIterations) - .setStepSize(params.stepSize) .setUpdater(updater) .setRegParam(params.regParam) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index 4e50ccb1c8dc3..fc972064c47c3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -158,3 +158,19 @@ object HuberRobustRegressionWithSGD { train(input, numIterations, 1.0, 1.0) } } + +/** + * Train a classification model for HuberRobust Regression using Limited-memory BFGS. + * Standard feature scaling and L2 regularization are used by default. + */ +class HuberRobustRegressionWithLBFGS + extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable { + + this.setFeatureScaling(true) + + override val optimizer = new LBFGS(new HuberRobustGradient, new SquaredL2Updater) + + override protected def createModel(weights: Vector, intercept: Double) = { + new HuberRobustRegressionModel(weights, intercept) + } +} \ No newline at end of file From ab0f53946b6b43fa8934f112c0c5b11e2224dfd8 Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Wed, 3 Sep 2014 22:17:09 -0700 Subject: [PATCH 4/8] adjust code style of HuberRobustRegressionWithLBFGS --- .../org/apache/spark/mllib/regression/RobustRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index fc972064c47c3..fa06742c8dbe1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -173,4 +173,4 @@ class HuberRobustRegressionWithLBFGS override protected def createModel(weights: Vector, intercept: Double) = { new HuberRobustRegressionModel(weights, intercept) } -} \ No newline at end of file +} From b198c18be4a22fddbdb4c840ccbb66081162cefc Mon Sep 17 00:00:00 2001 From: Fan Jiang Date: Thu, 4 Sep 2014 11:07:31 -0700 Subject: [PATCH 5/8] Update Gradient.scala --- .../org/apache/spark/mllib/optimization/Gradient.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 7834e683193c0..c9b21f20040a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -149,7 +149,7 @@ class HuberRobustGradient extends Gradient { (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) }else if(diff >= -k && diff <= k){ scal(diff, gradient) - (gradient, (1.0 / 2.0 * pow(diff, 2))) + (gradient, (1.0 / 2.0 * loss)) }else { scal(k, gradient) (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2))) @@ -162,8 +162,8 @@ class HuberRobustGradient extends Gradient { weights: Vector, cumGradient: Vector): Double = { val diff = dot(data, weights) - label + val loss = diff * diff val k = 1.345 - // if(diff < -k){ axpy(-k, data, cumGradient) }else if(diff >= -k && diff <= k){ @@ -171,11 +171,10 @@ class HuberRobustGradient extends Gradient { }else { axpy(k, data, cumGradient) } - if(diff < -k){ -k * diff - 1.0 / 2.0 * pow(k, 2) }else if(diff >= -k && diff <= k){ - 1.0 / 2.0 * pow(diff, 2) + 1.0 / 2.0 * loss }else { k * diff - 1.0 /2.0 * pow(k, 2) } From 9d6384fb8c8fe9fec2279e611781c11116d6590b Mon Sep 17 00:00:00 2001 From: Fan Jiang Date: Thu, 4 Sep 2014 11:31:26 -0700 Subject: [PATCH 6/8] Update Gradient.scala --- .../scala/org/apache/spark/mllib/optimization/Gradient.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index c9b21f20040a9..50b43b33ffbbd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -164,6 +164,7 @@ class HuberRobustGradient extends Gradient { val diff = dot(data, weights) - label val loss = diff * diff val k = 1.345 + if(diff < -k){ axpy(-k, data, cumGradient) }else if(diff >= -k && diff <= k){ From e56178cf44e00f760cc21d755e848f42a54e074e Mon Sep 17 00:00:00 2001 From: Fan Jiang Date: Fri, 5 Sep 2014 10:56:07 -0700 Subject: [PATCH 7/8] Update HuberRobustRegression.scala --- .../org/apache/spark/examples/mllib/HuberRobustRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala index 192bcf62091e7..b939dc463725a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} import scopt.OptionParser - import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.regression.HuberRobustRegressionWithLBFGS import org.apache.spark.mllib.util.MLUtils @@ -85,6 +84,7 @@ object HuberRobustRegression extends App { sys.exit(1) } +// If no parameter set SimpleUpdater will be used. def run(params: Params) { val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params") val sc = new SparkContext(conf) From 9e34bd2b536e3802045cc13bd1644cfa3f7a2661 Mon Sep 17 00:00:00 2001 From: Fan Jiang Date: Fri, 5 Sep 2014 14:48:00 -0700 Subject: [PATCH 8/8] Update HuberRobustRegression.scala --- .../org/apache/spark/examples/mllib/HuberRobustRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala index b939dc463725a..d0d334e853e98 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -84,7 +84,7 @@ object HuberRobustRegression extends App { sys.exit(1) } -// If no parameter set SimpleUpdater will be used. + // If no parameter set SimpleUpdater will be used. def run(params: Params) { val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params") val sc = new SparkContext(conf)