diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 4aff749ff75af..08330a30feeda 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -66,7 +66,11 @@ private[shared] object SharedParamsCodeGen { "options may be added later.", isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"), ParamDesc[Boolean]("standardization", "whether to standardize the training features" + - " before fitting the model", Some("true")), + " before fitting the model.", Some("true")), + ParamDesc[Boolean]("robustRegression", "whether to use robust Huber Cost Function", + Some("false")), + ParamDesc[Double]("robustEfficiency", "threshold in the distribution of errors (>= 0)", + isValid = "ParamValidators.gtEq(0)"), ParamDesc[Long]("seed", "random seed", Some("this.getClass.getName.hashCode.toLong")), ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]." + " For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty", diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index c088c16d1b05d..d04dcb07f8737 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -296,6 +296,38 @@ private[ml] trait HasStandardization extends Params { final def getStandardization: Boolean = $(standardization) } +/** + * Trait for shared param robustRegression (default: false). + */ +private[ml] trait HasRobustRegression extends Params { + + /** + * Param for whether to use robust Huber Cost Function. + * @group param + */ + final val robustRegression: BooleanParam = new BooleanParam(this, "robustRegression", "whether to use robust Huber Cost Function") + + setDefault(robustRegression, false) + + /** @group getParam */ + final def getRobustRegression: Boolean = $(robustRegression) +} + +/** + * Trait for shared param robustEfficiency. + */ +private[ml] trait HasRobustEfficiency extends Params { + + /** + * Param for threshold in the distribution of errors (>= 0). + * @group param + */ + final val robustEfficiency: DoubleParam = new DoubleParam(this, "robustEfficiency", "threshold in the distribution of errors (>= 0)", ParamValidators.gtEq(0)) + + /** @group getParam */ + final def getRobustEfficiency: Double = $(robustEfficiency) +} + /** * Trait for shared param seed (default: this.getClass.getName.hashCode.toLong). */ @@ -388,5 +420,25 @@ private[ml] trait HasSolver extends Params { /** @group getParam */ final def getSolver: String = $(solver) + +} + +/** + * Trait for shared param robust (default: false). + */ +private[ml] trait HasRobust extends Params { + + /** + * Param for whether to fit an intercept term. + * @group param + */ + final val robust: BooleanParam = new BooleanParam(this, "robust", "whether to use robust Huber Cost Function") + + setDefault(robust, false) + + /** @group getParam */ + final def getRobust: Boolean = $(robust) + } + // scalastyle:on diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index c54e08b2ad9a5..04791afb8c66a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -46,7 +46,8 @@ import org.apache.spark.storage.StorageLevel */ private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol - with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver + with HasFitIntercept with HasStandardization with HasRobustRegression + with HasRobustEfficiency with HasWeightCol with HasSolver /** * :: Experimental :: @@ -102,6 +103,22 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setStandardization(value: Boolean): this.type = set(standardization, value) setDefault(standardization -> true) + /** + * Set the robust Option to determine whether to use robust Huber Cost Function + * Default is false. + * @group setParam + */ + def setRobustRegression(value: Boolean): this.type = set(robustRegression, value) + setDefault(robustRegression -> false) + + /** + * Set the k threshold value in the probability distribution for the errors of robust regression. + * Default is 1.345 which will produce 95% efficiency. + * @group setParam + */ + def setRobustEfficiency(value: Double): this.type = set(robustEfficiency, value) + setDefault(robustEfficiency -> 1.345) + /** * Set the ElasticNet mixing parameter. * For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. @@ -156,6 +173,14 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setSolver(value: String): this.type = set(solver, value) setDefault(solver -> "auto") + /** + * Set the robust Option. + * Default is false. + * @group setParam + */ + def setRobust(value: Boolean): this.type = set(robust, value) + setDefault(robust -> false) + override protected def train(dataset: DataFrame): LinearRegressionModel = { // Extract the number of features before deciding optimization solver. val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { @@ -255,8 +280,13 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam - val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept), - $(standardization), featuresStd, featuresMean, effectiveL2RegParam) + val costFun = if ($(robustRegression)) { + new HuberCostFun(instances, yStd, yMean, $(fitIntercept), + $(standardization), featuresStd, featuresMean, effectiveL2RegParam, $(robustEfficiency)) + } else { + new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept), + $(standardization), featuresStd, featuresMean, effectiveL2RegParam) + } val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) { new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) @@ -937,3 +967,179 @@ private class LeastSquaresCostFun( (leastSquaresAggregator.loss + regVal, new BDV(totalGradientArray)) } } + +/** + * HuberCostFun implements Breeze's DiffFunction[T] for Huber cost as used in Robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + * It's used in Breeze's convex optimization routines. + */ + +private class HuberAggregator( + weights: Vector, + labelStd: Double, + labelMean: Double, + fitIntercept: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + robustEfficiency: Double) extends Serializable { + + private var totalCnt: Long = 0L + private var lossSum = 0.0 + + private val (effectiveWeightsArray: Array[Double], offset: Double, dim: Int) = { + val weightsArray = weights.toArray.clone() + var sum = 0.0 + var i = 0 + val len = weightsArray.length + while (i < len) { + if (featuresStd(i) != 0.0) { + weightsArray(i) /= featuresStd(i) + sum += weightsArray(i) * featuresMean(i) + } else { + weightsArray(i) = 0.0 + } + i += 1 + } + (weightsArray, if (fitIntercept) labelMean / labelStd - sum else 0.0, weightsArray.length) + } + + private val effectiveWeightsVector = Vectors.dense(effectiveWeightsArray) + + private val gradientSumArray = Array.ofDim[Double](dim) + + /** + * Add a new training data to this HuberAggregator, and update the loss and gradient + * of the objective function. + * + * @param label The label for this data point. + * @param data The features for one data point in dense/sparse vector format to be added + * into this aggregator. + * @return This HuberAggregator object. + */ + def add(label: Double, data: Vector): this.type = { + require(dim == data.size, s"Dimensions mismatch when adding new sample." + + s" Expecting $dim but got ${data.size}.") + + val diff = dot(data, effectiveWeightsVector) - label / labelStd + offset + val k = robustEfficiency + + if (diff < -k) { + lossSum += (-k * diff - 0.5 * k * k) / 2.0 + } else if (diff > k) { + lossSum += (k * diff - 0.5 * k * k) / 2.0 + } else if (diff != 0) { + val localGradientSumArray = gradientSumArray + data.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += diff * value / featuresStd(index) + } + } + lossSum += diff * diff / 4.0 + } + + totalCnt += 1 + this + } + + /** + * Merge another HuberAggregator, and update the loss and gradient + * of the objective function. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other HuberAggregator to be merged. + * @return This HuberAggregator object. + */ + def merge(other: HuberAggregator): this.type = { + require(dim == other.dim, s"Dimensions mismatch when merging with another " + + s"HuberAggregator. Expecting $dim but got ${other.dim}.") + + if (other.totalCnt != 0) { + totalCnt += other.totalCnt + lossSum += other.lossSum + + var i = 0 + val localThisGradientSumArray = this.gradientSumArray + val localOtherGradientSumArray = other.gradientSumArray + while (i < dim) { + localThisGradientSumArray(i) += localOtherGradientSumArray(i) + i += 1 + } + } + this + } + + def count: Long = totalCnt + + def loss: Double = lossSum / totalCnt + + def gradient: Vector = { + val result = Vectors.dense(gradientSumArray.clone()) + scal(1.0 / totalCnt, result) + result + } +} + +private class HuberCostFun( + data: RDD[(Double, Vector)], + labelStd: Double, + labelMean: Double, + fitIntercept: Boolean, + standardization: Boolean, + featuresStd: Array[Double], + featuresMean: Array[Double], + effectiveL2regParam: Double, + robustEfficiency: Double) extends DiffFunction[BDV[Double]] { + + override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { + val w = Vectors.fromBreeze(weights) + + val huberAggregator = data.treeAggregate(new HuberAggregator(w, labelStd, + labelMean, fitIntercept, featuresStd, featuresMean, robustEfficiency))( + seqOp = (c, v) => (c, v) match { + case (aggregator, (label, features)) => aggregator.add(label, features) + }, + combOp = (c1, c2) => (c1, c2) match { + case (aggregator1, aggregator2) => aggregator1.merge(aggregator2) + }) + + val totalGradientArray = huberAggregator.gradient.toArray + + val regVal = if (effectiveL2regParam == 0.0) { + 0.0 + } else { + var sum = 0.0 + w.foreachActive { (index, value) => + // The following code will compute the loss of the regularization; also + // the gradient of the regularization, and add back to totalGradientArray. + sum += { + if (standardization) { + totalGradientArray(index) += effectiveL2regParam * value + value * value + } else { + if (featuresStd(index) != 0.0) { + // If `standardization` is false, we still standardize the data + // to improve the rate of convergence; as a result, we have to + // perform this reverse standardization by penalizing each component + // differently to get effectively the same objective function when + // the training dataset is not standardized. + val temp = value / (featuresStd(index) * featuresStd(index)) + totalGradientArray(index) += effectiveL2regParam * temp + value * temp + } else { + 0.0 + } + } + } + } + 0.5 * effectiveL2regParam * sum + } + + (huberAggregator.loss + regVal, new BDV(totalGradientArray)) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 273c882c2a47f..b66524a0090a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.ml.regression import scala.util.Random +import breeze.numerics.abs import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.ParamsSuite + import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors} + import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} import org.apache.spark.mllib.util.TestingUtils._ @@ -37,6 +40,8 @@ class LinearRegressionSuite @transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _ @transient var datasetWithSparseFeature: DataFrame = _ @transient var datasetWithWeight: DataFrame = _ + @transient var datasetWithOutliers: DataFrame = _ + @transient var datasetWithoutInterceptWithOutliers: DataFrame = _ /* In `LinearRegressionSuite`, we will make sure that the model trained by SparkML @@ -53,10 +58,32 @@ class LinearRegressionSuite */ override def beforeAll(): Unit = { super.beforeAll() + datasetWithDenseFeature = sqlContext.createDataFrame( sc.parallelize(LinearDataGenerator.generateLinearInput( intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2)) + + val dataWithOutliers = data.map( x => + if (scala.util.Random.nextDouble() < 0.1) { + LabeledPoint(x.label * 10, x.features) + } + else { + LabeledPoint(x.label, x.features) + } + ) + + val dataWithoutInterceptWithOutliers = dataWithoutIntercept.map( x => + if (scala.util.Random.nextDouble() < 0.1) { + LabeledPoint(x.label * 10, x.features) + } + else { + LabeledPoint(x.label, x.features) + } + ) + + dataset = sqlContext.createDataFrame(data) + /* datasetWithoutIntercept is not needed for correctness testing but is useful for illustrating training model without intercept @@ -92,6 +119,14 @@ class LinearRegressionSuite Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) ), 2)) + + datasetWithoutIntercept = sqlContext.createDataFrame(dataWithoutIntercept) + + datasetWithOutliers = sqlContext.createDataFrame(dataWithOutliers) + + datasetWithoutInterceptWithOutliers = + sqlContext.createDataFrame(dataWithoutInterceptWithOutliers) + } test("params") { @@ -893,6 +928,328 @@ class LinearRegressionSuite testEstimatorAndModelReadWrite(lr, datasetWithWeight, LinearRegressionSuite.allParamSettings, checkModelData) } + + test("Robust params") { + ParamsSuite.checkParams((new LinearRegression).setRobustRegression(true)) + val model = new LinearRegressionModel("RobustReg", Vectors.dense(0.0), 0.0) + ParamsSuite.checkParams(model) + } + + test("Robust regression: default params") { + val lir = (new LinearRegression).setRobustRegression(true) + assert(lir.getLabelCol === "label") + assert(lir.getFeaturesCol === "features") + assert(lir.getPredictionCol === "prediction") + assert(lir.getRegParam === 0.0) + assert(lir.getElasticNetParam === 0.0) + assert(lir.getFitIntercept) + assert(lir.getStandardization) + val model = lir.fit(dataset) + model.transform(dataset) + .select("label", "prediction") + .collect() + assert(model.getFeaturesCol === "features") + assert(model.getPredictionCol === "prediction") + assert(model.intercept !== 0.0) + assert(model.hasParent) + } + + test("Robust regression with intercept without regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true) + val model1 = trainer1.fit(datasetWithOutliers) + val trainer2 = new LinearRegression + val model2 = trainer2.fit(datasetWithOutliers) + + /* + Using the following R code to load the data and train the model using glmnet package. + + library("glmnet") + data <- read.csv("path", header=FALSE, stringsAsFactors=FALSE) + features <- as.matrix(data.frame(as.numeric(data$V2), as.numeric(data$V3))) + label <- as.numeric(data$V1) + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0, lambda = 0)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.298698 + as.numeric.data.V2. 4.700706 + as.numeric.data.V3. 7.199082 + */ + val interceptR = 6.298698 + val weightsR = Vectors.dense(4.700706, 7.199082) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept without regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setFitIntercept(false) + // Without regularization the results should be the same + val trainer2 = (new LinearRegression).setFitIntercept(false) + val modelWithoutIntercept1 = trainer1.fit(datasetWithoutInterceptWithOutliers) + val modelWithoutIntercept2 = trainer2.fit(datasetWithoutInterceptWithOutliers) + + /* + Then again with the data with no intercept: + > weightsWithoutIntercept + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data3.V2. 4.70011 + as.numeric.data3.V3. 7.19943 + */ + val weightsWithoutInterceptR = Vectors.dense(4.70011, 7.19943) + + val modelWithoutIntercept1_diff1 = + abs(weightsWithoutInterceptR(0) - modelWithoutIntercept1.weights(0)) + val modelWithoutIntercept2_diff1 = + abs(weightsWithoutInterceptR(0) - modelWithoutIntercept2.weights(0)) + val modelWithoutIntercept1_diff2 = + abs(weightsWithoutInterceptR(1) - modelWithoutIntercept1.weights(1)) + val modelWithoutIntercept2_diff2 = + abs(weightsWithoutInterceptR(1) - modelWithoutIntercept2.weights(1)) + + assert(modelWithoutIntercept1.intercept ~== 0 absTol 1E-3) + assert(modelWithoutIntercept1_diff1 < modelWithoutIntercept2_diff1) + assert(modelWithoutIntercept1_diff2 < modelWithoutIntercept2_diff2) + } + + test("Robust regression with intercept with L1 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(1.0) + .setRegParam(0.57) + val trainer2 = (new LinearRegression).setElasticNetParam(1.0).setRegParam(0.57) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.24300 + as.numeric.data.V2. 4.024821 + as.numeric.data.V3. 6.679841 + */ + val interceptR = 6.24300 + val weightsR = Vectors.dense(4.024821, 6.679841) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L1 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(1.0) + .setRegParam(0.57) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(1.0) + .setRegParam(0.57) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, lambda = 0.57, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.299752 + as.numeric.data.V3. 4.772913 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(6.299752, 4.772913) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with L2 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.0) + .setRegParam(2.3) + val trainer2 = (new LinearRegression).setElasticNetParam(0.0) + .setRegParam(2.3) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 5.269376 + as.numeric.data.V2. 3.736216 + as.numeric.data.V3. 5.712356) + */ + val interceptR = 5.269376 + val weightsR = Vectors.dense(3.736216, 5.712356) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with L2 regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.0) + .setRegParam(2.3) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(0.0) + .setRegParam(2.3) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.0, lambda = 2.3, + intercept = FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 5.522875 + as.numeric.data.V3. 4.214502 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.522875, 4.214502) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression with intercept with ElasticNet regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.3) + .setRegParam(1.6) + val trainer2 = (new LinearRegression).setElasticNetParam(0.3) + .setRegParam(1.6) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.324108 + as.numeric.data.V2. 3.168435 + as.numeric.data.V3. 5.200403 + */ + val interceptR = 5.696056 + val weightsR = Vectors.dense(3.670489, 6.001122) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } + + test("Robust regression without intercept with ElasticNet regularization") { + val trainer1 = (new LinearRegression).setRobustRegression(true).setElasticNetParam(0.3) + .setRegParam(1.6) + .setFitIntercept(false) + val trainer2 = (new LinearRegression).setElasticNetParam(0.3) + .setRegParam(1.6) + .setFitIntercept(false) + val model1 = trainer1.fit(datasetWithOutliers) + val model2 = trainer2.fit(datasetWithOutliers) + + /* + weights <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6, + intercept=FALSE)) + > weights + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.dataM.V2. 5.673348 + as.numeric.dataM.V3. 4.322251 + */ + val interceptR = 0.0 + val weightsR = Vectors.dense(5.673348, 4.322251) + + val model1_diff1 = abs(weightsR(0) - model1.weights(0)) + val model2_diff1 = abs(weightsR(0) - model2.weights(0)) + val model1_diff2 = abs(weightsR(1) - model1.weights(1)) + val model2_diff2 = abs(weightsR(1) - model2.weights(1)) + + assert(model1_diff1 < model2_diff1) + assert(model1_diff2 < model2_diff2) + + model1.transform(datasetWithOutliers).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.weights(0) + features(1) * model1.weights(1) + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) + } + } } object LinearRegressionSuite {