From 01601eed116b0dcf03bbe428e9bb9d5d94116dd9 Mon Sep 17 00:00:00 2001 From: Fan Jiang Date: Thu, 6 Aug 2015 17:10:41 -0700 Subject: [PATCH] add RobustRegression and Unit Tests add the objective function, and use Params to switch edit to pass scala style tests make HasRobustRegression in SharedParamsCodeGen.scala, Make the document more explicitly and make k tunable and default to 1.345 by having another param UnitTests with Outliers UnitTests with Outliers Edit HuberAggregator scala codestyle Update LinearRegression.scala --- .../ml/param/shared/SharedParamsCodeGen.scala | 6 +- .../spark/ml/param/shared/sharedParams.scala | 52 +++ .../ml/regression/LinearRegression.scala | 212 ++++++++++- .../ml/regression/LinearRegressionSuite.scala | 357 ++++++++++++++++++ 4 files changed, 623 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 4aff749ff75af..08330a30feeda 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -66,7 +66,11 @@ private[shared] object SharedParamsCodeGen { "options may be added later.", isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"), ParamDesc[Boolean]("standardization", "whether to standardize the training features" + - " before fitting the model", Some("true")), + " before fitting the model.", Some("true")), + ParamDesc[Boolean]("robustRegression", "whether to use robust Huber Cost Function", + Some("false")), + ParamDesc[Double]("robustEfficiency", "threshold in the distribution of errors (>= 0)", + isValid = "ParamValidators.gtEq(0)"), ParamDesc[Long]("seed", "random seed", Some("this.getClass.getName.hashCode.toLong")), ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]." + " For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty", diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index c088c16d1b05d..d04dcb07f8737 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -296,6 +296,38 @@ private[ml] trait HasStandardization extends Params { final def getStandardization: Boolean = $(standardization) } +/** + * Trait for shared param robustRegression (default: false). + */ +private[ml] trait HasRobustRegression extends Params { + + /** + * Param for whether to use robust Huber Cost Function. + * @group param + */ + final val robustRegression: BooleanParam = new BooleanParam(this, "robustRegression", "whether to use robust Huber Cost Function") + + setDefault(robustRegression, false) + + /** @group getParam */ + final def getRobustRegression: Boolean = $(robustRegression) +} + +/** + * Trait for shared param robustEfficiency. + */ +private[ml] trait HasRobustEfficiency extends Params { + + /** + * Param for threshold in the distribution of errors (>= 0). + * @group param + */ + final val robustEfficiency: DoubleParam = new DoubleParam(this, "robustEfficiency", "threshold in the distribution of errors (>= 0)", ParamValidators.gtEq(0)) + + /** @group getParam */ + final def getRobustEfficiency: Double = $(robustEfficiency) +} + /** * Trait for shared param seed (default: this.getClass.getName.hashCode.toLong). */ @@ -388,5 +420,25 @@ private[ml] trait HasSolver extends Params { /** @group getParam */ final def getSolver: String = $(solver) + +} + +/** + * Trait for shared param robust (default: false). + */ +private[ml] trait HasRobust extends Params { + + /** + * Param for whether to fit an intercept term. + * @group param + */ + final val robust: BooleanParam = new BooleanParam(this, "robust", "whether to use robust Huber Cost Function") + + setDefault(robust, false) + + /** @group getParam */ + final def getRobust: Boolean = $(robust) + } + // scalastyle:on diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index c54e08b2ad9a5..04791afb8c66a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -46,7 +46,8 @@ import org.apache.spark.storage.StorageLevel */ private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol - with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver + with HasFitIntercept with HasStandardization with HasRobustRegression + with HasRobustEfficiency with HasWeightCol with HasSolver /** * :: Experimental :: @@ -102,6 +103,22 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setStandardization(value: Boolean): this.type = set(standardization, value) setDefault(standardization -> true) + /** + * Set the robust Option to determine whether to use robust Huber Cost Function + * Default is false. + * @group setParam + */ + def setRobustRegression(value: Boolean): this.type = set(robustRegression, value) + setDefault(robustRegression -> false) + + /** + * Set the k threshold value in the probability distribution for the errors of robust regression. + * Default is 1.345 which will produce 95% efficiency. + * @group setParam + */ + def setRobustEfficiency(value: Double): this.type = set(robustEfficiency, value) + setDefault(robustEfficiency -> 1.345) + /** * Set the ElasticNet mixing parameter. * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. @@ -156,6 +173,14 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setSolver(value: String): this.type = set(solver, value) setDefault(solver -> "auto") + /** + * Set the robust Option. + * Default is false. + * @group setParam + */ + def setRobust(value: Boolean): this.type = set(robust, value) + setDefault(robust -> false) + override protected def train(dataset: DataFrame): LinearRegressionModel = { // Extract the number of features before deciding optimization solver. val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { @@ -255,8 +280,13 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam - val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept), - $(standardization), featuresStd, featuresMean, effectiveL2RegParam) + val costFun = if ($(robustRegression)) { + new HuberCostFun(instances, yStd, yMean, $(fitIntercept), + $(standardization), featuresStd, featuresMean, effectiveL2RegParam, $(robustEfficiency)) + } else { + new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept), + $(standardization), featuresStd, featuresMean, effectiveL2RegParam) + } val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) @@ -937,3 +967,179 @@ private class LeastSquaresCostFun( (leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray)) } } + +/** + * HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used in Robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + * It's used in Breeze's convex optimization routines. + */ + +private class HuberAggregator( + weights: Vector, + labelStd: Double, + labelMean: Double, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + robustEfficiency: Double) extends Serializable { + + private var totalCnt: Long = 0L + private var lossSum = 0.0 + + private val (effectiveWeightsArray: Array[Double], offset: Double, dim: Int) = { + val weightsArray = weights.toArray.clone() + var sum = 0.0 + var i = 0 + val len = weightsArray.length + while (i < len) { + if (featuresStd(i) != 0.0) { + weightsArray(i) /= featuresStd(i) + sum += weightsArray(i) * featuresMean(i) + } else { + weightsArray(i) = 0.0 + } + i += 1 + } + (weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, weightsArray.length) + } + + private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray) + + private val gradientSumArray = Array.ofDim[Double](dim) + + /** + * Add a new training data to this HuberAggregator, and update the loss and gradient + * of the objective function. + * + * @param label The label for this data point. + * @param data The features for one data point in dense/sparse vector format to be added + * into this aggregator. + * @return This HuberAggregator object. + */ + def add(label: Double, data: Vector): this.type = { + require(dim == data.size, s"Dimensions mismatch when adding new sample." + + s" Expecting $dim but got ${data.size}.") + + val diff = dot(data, effectiveWeightsVector) - label / labelStd + offset + val k = robustEfficiency + + if (diff < -k) { + lossSum += (-k * diff - 0.5 * k * k) / 2.0 + } else if (diff > k) { + lossSum += (k * diff - 0.5 * k * k) / 2.0 + } else if (diff != 0) { + val localGradientSumArray = gradientSumArray + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += diff * value / featuresStd(index) + } + } + lossSum += diff * diff / 4.0 + } + + totalCnt += 1 + this + } + + /** + * Merge another HuberAggregator, and update the loss and gradient + * of the objective function. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other HuberAggregator to be merged. + * @return This HuberAggregator object. + */ + def merge(other: HuberAggregator): this.type = { + require(dim == other.dim, s"Dimensions mismatch when merging with another " + + s"HuberAggregator. Expecting $dim but got ${other.dim}.") + + if (other.totalCnt != 0) { + totalCnt += other.totalCnt + lossSum += other.lossSum + + var i = 0 + val localThisGradientSumArray = this.gradientSumArray + val localOtherGradientSumArray = other.gradientSumArray + while (i < dim) { + localThisGradientSumArray(i) += localOtherGradientSumArray(i) + i += 1 + } + } + this + } + + def count: Long = totalCnt + + def loss: Double = lossSum / totalCnt + + def gradient: Vector = { + val result = Vectors.dense(gradientSumArray.clone()) + scal(1.0 / totalCnt, result) + result + } +} + +private class HuberCostFun( + data: RDD[(Double, Vector)], + labelStd: Double, + labelMean: Double, + fitIntercept: Boolean, + standardization: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + effectiveL2regParam: Double, + robustEfficiency: Double) extends DiffFunction[BDV[Double]] { + + override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { + val w = Vectors.fromBreeze(weights) + + val huberAggregator = data.treeAggregate(new HuberAggregator(w, labelStd, + labelMean, fitIntercept, featuresStd, featuresMean, robustEfficiency))( + seqOp = (c, v) => (c, v) match { + case (aggregator, (label, features)) => aggregator.add(label, features) + }, + combOp = (c1, c2) => (c1, c2) match { + case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + }) + + val totalGradientArray = huberAggregator.gradient.toArray + + val regVal = if (effectiveL2regParam == 0.0) { + 0.0 + } else { + var sum = 0.0 + w.foreachActive { (index, value) => + // The following code will compute the loss of the regularization; also + // the gradient of the regularization, and add back to totalGradientArray. + sum += { + if (standardization) { + totalGradientArray(index) += effectiveL2regParam * value + value * value + } else { + if (featuresStd(index) != 0.0) { + // If `standardization` is false, we still standardize the data + // to improve the rate of convergence; as a result, we have to + // perform this reverse standardization by penalizing each component + // differently to get effectively the same objective function when + // the training dataset is not standardized. + val temp = value / (featuresStd(index) * featuresStd(index)) + totalGradientArray(index) += effectiveL2regParam * temp + value * temp + } else { + 0.0 + } + } + } + } + 0.5 * effectiveL2regParam * sum + } + + (huberAggregator.loss + regVal, new BDV(totalGradientArray)) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 273c882c2a47f..b66524a0090a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.ml.regression import scala.util.Random +import breeze.numerics.abs import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.ParamsSuite + import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors} + import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} import org.apache.spark.mllib.util.TestingUtils._ @@ -37,6 +40,8 @@ class LinearRegressionSuite @transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _ @transient var datasetWithSparseFeature: DataFrame = _ @transient var datasetWithWeight: DataFrame = _ + @transient var datasetWithOutliers: DataFrame = _ + @transient var datasetWithoutInterceptWithOutliers: DataFrame = _ /* In `LinearRegressionSuite`, we will make sure that the model trained by SparkML @@ -53,10 +58,32 @@ class LinearRegressionSuite */ override def beforeAll(): Unit = { super.beforeAll() + datasetWithDenseFeature = sqlContext.createDataFrame( sc.parallelize(LinearDataGenerator.generateLinearInput( intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2)) + + val dataWithOutliers = data.map( x => + if (scala.util.Random.nextDouble() < 0.1) { + LabeledPoint(x.label * 10, x.features) + } + else { + LabeledPoint(x.label, x.features) + } + ) + + val dataWithoutInterceptWithOutliers = dataWithoutIntercept.map( x => + if (scala.util.Random.nextDouble() < 0.1) { + LabeledPoint(x.label * 10, x.features) + } + else { + LabeledPoint(x.label, x.features) + } + ) + + dataset = sqlContext.createDataFrame(data) + /* datasetWithoutIntercept is not needed for correctness testing but is useful for illustrating training model without intercept @@ -92,6 +119,14 @@ class LinearRegressionSuite Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) ), 2)) + + datasetWithoutIntercept = sqlContext.createDataFrame(dataWithoutIntercept) + + datasetWithOutliers = sqlContext.createDataFrame(dataWithOutliers) + + datasetWithoutInterceptWithOutliers = + sqlContext.createDataFrame(dataWithoutInterceptWithOutliers) + } test("params") { @@ -893,6 +928,328 @@ class LinearRegressionSuite testEstimatorAndModelReadWrite(lr, datasetWithWeight, LinearRegressionSuite.allParamSettings, checkModelData) } + + test("Robust params") { + ParamsSuite.checkParams((new LinearRegression).setRobustRegression(true)) + val model = new LinearRegressionModel("RobustReg", Vectors.dense(0.0), 0.0) + ParamsSuite.checkParams(model) + } + + test("Robust regression: default params") { + val lir = (new LinearRegression).setRobustRegression(true) + assert(lir.getLabelCol === "label") + assert(lir.getFeaturesCol === "features") + assert(lir.getPredictionCol === "prediction") + assert(lir.getRegParam === 0.0) + assert(lir.getElasticNetParam === 0.0) + assert(lir.getFitIntercept) + assert(lir.getStandardization) + val model = lir.fit(dataset) + model.transform(dataset) + .select("label", "prediction") + .collect() + assert(model.getFeaturesCol === "features") + assert(model.getPredictionCol === "prediction") + assert(model.intercept !== 0.0) + assert(model.hasParent) + } + + test("Robust regression with intercept without regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true) + val model1 = trainer1.fit(datasetWithOutliers) + val trainer2 = new LinearRegression + val model2 = trainer2.fit(datasetWithOutliers) + + /* + Using the following R code to load the data and train the model using glmnet package. + + library("glmnet") + data <- read.csv("path", header=FALSE, stringsAsFactors=FALSE) + features <- as.matrix(data.frame(as.numeric(data$V2), as.numeric(data$V3))) + label <- as.numeric(data$V1) + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0, lambda = 0)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.298698 + as.numeric.data.V2. 4.700706 + as.numeric.data.V3. 7.199082 + */ + val interceptR = 6.298698 + val weightsR = Vectors.dense(4.700706, 7.199082) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept without regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setFitIntercept(false) + // Without regularization the results should be the same + val trainer2 = (new LinearRegression).setFitIntercept(false) + val modelWithoutIntercept1 = trainer1.fit(datasetWithoutInterceptWithOutliers) + val modelWithoutIntercept2 = trainer2.fit(datasetWithoutInterceptWithOutliers) + + /* + Then again with the data with no intercept: + > weightsWithoutIntercept + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data3.V2. 4.70011 + as.numeric.data3.V3. 7.19943 + */ + val weightsWithoutInterceptR = Vectors.dense(4.70011, 7.19943) + + val modelWithoutIntercept1_diff1 = + abs(weightsWithoutInterceptR(0) - modelWithoutIntercept1.weights(0)) + val modelWithoutIntercept2_diff1 = + abs(weightsWithoutInterceptR(0) - modelWithoutIntercept2.weights(0)) + val modelWithoutIntercept1_diff2 = + abs(weightsWithoutInterceptR(1) - modelWithoutIntercept1.weights(1)) + val modelWithoutIntercept2_diff2 = + abs(weightsWithoutInterceptR(1) - modelWithoutIntercept2.weights(1)) + + assert(modelWithoutIntercept1.intercept ~== 0 absTol 1E-3) + assert(modelWithoutIntercept1_diff1 < modelWithoutIntercept2_diff1) + assert(modelWithoutIntercept1_diff2 < modelWithoutIntercept2_diff2) + } + + test("Robust regression with intercept with L1 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(1.0) + .setRegParam(0.57) + val trainer2 = (new LinearRegression).setElasticNetParam(1.0).setRegParam(0.57) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.24300 + as.numeric.data.V2. 4.024821 + as.numeric.data.V3. 6.679841 + */ + val interceptR = 6.24300 + val weightsR = Vectors.dense(4.024821, 6.679841) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L1 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(1.0) + .setRegParam(0.57) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(1.0) + .setRegParam(0.57) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.299752 + as.numeric.data.V3. 4.772913 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(6.299752, 4.772913) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with L2 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.0) + .setRegParam(2.3) + val trainer2 = (new LinearRegression).setElasticNetParam(0.0) + .setRegParam(2.3) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 5.269376 + as.numeric.data.V2. 3.736216 + as.numeric.data.V3. 5.712356) + */ + val interceptR = 5.269376 + val weightsR = Vectors.dense(3.736216, 5.712356) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L2 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.0) + .setRegParam(2.3) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(0.0) + .setRegParam(2.3) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3, + intercept = FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 5.522875 + as.numeric.data.V3. 4.214502 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.522875, 4.214502) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with ElasticNet regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.3) + .setRegParam(1.6) + val trainer2 = (new LinearRegression).setElasticNetParam(0.3) + .setRegParam(1.6) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.324108 + as.numeric.data.V2. 3.168435 + as.numeric.data.V3. 5.200403 + */ + val interceptR = 5.696056 + val weightsR = Vectors.dense(3.670489, 6.001122) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with ElasticNet regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.3) + .setRegParam(1.6) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(0.3) + .setRegParam(1.6) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.dataM.V2. 5.673348 + as.numeric.dataM.V3. 4.322251 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.673348, 4.322251) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } } object LinearRegressionSuite {