diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala new file mode 100644 index 0000000000000..d0d334e853e98 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.regression.HuberRobustRegressionWithLBFGS +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} + +/** + * An example app for Huber Robust regression. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression + * }}} + * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object HuberRobustRegression extends App { + + object RegType extends Enumeration { + type RegType = Value + val NONE, L1, L2 = Value + } + + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) + + val defaultParams = Params() + + val parser = new OptionParser[Params]("HuberRobustRegression") { + head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \ + | examples/target/scala-*/spark-examples-*.jar \ + | data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + // If no parameter set SimpleUpdater will be used. + def run(params: Params) { + val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case NONE => new SimpleUpdater() + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val algorithm = new HuberRobustRegressionWithLBFGS() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setUpdater(updater) + .setRegParam(params.regParam) + + val model = algorithm.run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val loss = predictionAndLabel.map { case (p, l) => + val err = p - l + err * err + }.reduce(_ + _) + val rmse = math.sqrt(loss / numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index fdd67160114ca..50b43b33ffbbd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} +import scala.math.pow /** * :: DeveloperApi :: @@ -118,6 +119,69 @@ class LeastSquaresGradient extends Gradient { } } +/** + * :: DeveloperApi :: + * Compute gradient and loss for Huber objective function, as used in robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class HuberRobustGradient extends Gradient { + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val diff = dot(data, weights) - label + val loss = diff * diff + val gradient = data.copy + val k = 1.345 + + /** + * Tuning constant is generally picked to give reasonably high efficiency in the normal case. + * Smaller values produce more resistance to outliers while at the expense of + * lower efficiency when the errors are normally distributed. + */ + + if(diff < -k){ + scal(-k, gradient) + (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) + }else if(diff >= -k && diff <= k){ + scal(diff, gradient) + (gradient, (1.0 / 2.0 * loss)) + }else { + scal(k, gradient) + (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2))) + } + } + + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { + val diff = dot(data, weights) - label + val loss = diff * diff + val k = 1.345 + + if(diff < -k){ + axpy(-k, data, cumGradient) + }else if(diff >= -k && diff <= k){ + axpy(diff, data, cumGradient) + }else { + axpy(k, data, cumGradient) + } + if(diff < -k){ + -k * diff - 1.0 / 2.0 * pow(k, 2) + }else if(diff >= -k && diff <= k){ + 1.0 / 2.0 * loss + }else { + k * diff - 1.0 /2.0 * pow(k, 2) + } + } +} + /** * :: DeveloperApi :: * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala new file mode 100644 index 0000000000000..fa06742c8dbe1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.optimization._ +import org.apache.spark.annotation.Experimental + +/** + * Regression model trained using Huber M-Estimation RobustRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class HuberRobustRegressionModel ( + override val weights: Vector, + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { + + override protected def predictPoint( + dataMatrix: Vector, + weightMatrix: Vector, + intercept: Double): Double = { + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept + } +} + +/** + * Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent. + * This solves the Huber objective function + * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + * its corresponding right hand side label y. + * See also the documentation for the precise formulation. + */ +class HuberRobustRegressionWithSGD private[mllib] ( + private var stepSize: Double, + private var numIterations: Int, + private var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable { + + private val gradient = new HuberRobustGradient() + private val updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0, + * numIterations: 100, miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new HuberRobustRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling HuberRobustRegression. + */ +object HuberRobustRegressionWithSGD { + + /** + * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used + * in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Vector): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + .run(input, initialWeights) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. We use the entire + * data set to compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double): HuberRobustRegressionModel = { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using a step size of 1.0. We use the entire data + * set to compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): HuberRobustRegressionModel = { + train(input, numIterations, 1.0, 1.0) + } +} + +/** + * Train a classification model for HuberRobust Regression using Limited-memory BFGS. + * Standard feature scaling and L2 regularization are used by default. + */ +class HuberRobustRegressionWithLBFGS + extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable { + + this.setFeatureScaling(true) + + override val optimizer = new LBFGS(new HuberRobustGradient, new SquaredL2Updater) + + override protected def createModel(weights: Vector, intercept: Double) = { + new HuberRobustRegressionModel(weights, intercept) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala new file mode 100644 index 0000000000000..ca354421cd72f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator, + LocalSparkContext} + +class RobustRegressionSuite extends FunSuite with LocalSparkContext { + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("Huber Robust regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(true) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X2 + test("Huber Robust regression without intercept") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X10000 + test("sparse Huber Robust regression without intercept") { + val denseRDD = sc.parallelize( + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2) + val sparseRDD = denseRDD.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + }.cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(sparseRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 10000) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(9999) >= 9.0 && weights(9999) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) + val sparseValidationData = validationData.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + } + val sparseValidationRDD = sc.parallelize(sparseValidationData, 2) + + // Test prediction on RDD. + validatePrediction( + model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData) + + // Test prediction on Array. + validatePrediction( + sparseValidationData.map(row => model.predict(row.features)), sparseValidationData) + } +}