From c562823e96d63773b0a81e0b71e53f8ceca96680 Mon Sep 17 00:00:00 2001 From: sethah Date: Sat, 24 Sep 2016 19:30:41 -0700 Subject: [PATCH 1/6] one pass solver for elasticnet --- .../org/apache/spark/ml/linalg/BLAS.scala | 9 + .../apache/spark/ml/linalg/BLASSuite.scala | 33 ++ .../IterativelyReweightedLeastSquares.scala | 4 +- .../spark/ml/optim/NormalEquationSolver.scala | 165 +++++++ .../spark/ml/optim/WeightedLeastSquares.scala | 258 +++++++++-- .../GeneralizedLinearRegression.scala | 4 +- .../ml/regression/LinearRegression.scala | 20 +- .../mllib/linalg/CholeskyDecomposition.scala | 4 +- ...erativelyReweightedLeastSquaresSuite.scala | 6 +- .../ml/optim/WeightedLeastSquaresSuite.scala | 341 ++++++++++++-- .../ml/regression/LinearRegressionSuite.scala | 433 +++++++++--------- 11 files changed, 973 insertions(+), 304 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 4ca19f3387f0..2b30decb0a9b 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -243,6 +243,15 @@ private[spark] object BLAS extends Serializable { spr(alpha, v, U.values) } + /** + * y += alpha * A * x + * + * @param A The upper triangular part of A in a [[DenseVector]] (column major) + */ + def dspmv(n: Int, alpha: Double, A: DenseVector, x: DenseVector, y: DenseVector): Unit = { + f2jBLAS.dspmv("U", n, alpha, A.values, x.values, 1, 1.0, y.values, 1) + } + /** * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR. * diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index 6e72a5fff0a9..1b02eee5e65d 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -422,4 +422,37 @@ class BLASSuite extends SparkMLFunSuite { assert(dATT.multiply(sx) ~== expected absTol 1e-15) assert(sATT.multiply(sx) ~== expected absTol 1e-15) } + + test("spmv") { + /* + A = [[3.0, -2.0, 2.0, -4.0], + [-2.0, -8.0, 4.0, 7.0], + [2.0, 4.0, -3.0, -3.0], + [-4.0, 7.0, -3.0, 0.0]] + x = [5.0, 2.0, 1.0, 9.0] + Ax = [ 45., -93., 48., -3.] + */ + val A = new DenseVector(Array(3.0, -2.0, -8.0, 2.0, 4.0, -3.0, -4.0, 7.0, -3.0, 0.0)) + val x = new DenseVector(Array(5.0, 2.0, -1.0, -9.0)) + val n = 4 + + val y1 = new DenseVector(Array(-3.0, 6.0, -8.0, -3.0)) + val y2 = y1.copy + val y3 = y1.copy + val y4 = y1.copy + + val expected1 = new DenseVector(Array(42.0, -87.0, 40.0, -6.0)) + val expected2 = new DenseVector(Array(19.5, -40.5, 16.0, -4.5)) + val expected3 = new DenseVector(Array(-25.5, 52.5, -32.0, -1.5)) + val expected4 = new DenseVector(Array(-3.0, 6.0, -8.0, -3.0)) + + dspmv(n, 1.0, A, x, y1) + dspmv(n, 0.5, A, x, y2) + dspmv(n, -0.5, A, x, y3) + dspmv(n, 0.0, A, x, y4) + assert(y1 ~== expected1 absTol 1e-8) + assert(y2 ~== expected2 absTol 1e-8) + assert(y3 ~== expected3 absTol 1e-8) + assert(y4 ~== expected4 absTol 1e-8) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index d732f53029e8..8a6b862cda17 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -81,8 +81,8 @@ private[ml] class IterativelyReweightedLeastSquares( } // Estimate new model - model = new WeightedLeastSquares(fitIntercept, regParam, standardizeFeatures = false, - standardizeLabel = false).fit(newInstances) + model = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0, + standardizeFeatures = false, standardizeLabel = false).fit(newInstances) // Check convergence val oldCoefficients = oldModel.coefficients diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala new file mode 100644 index 000000000000..a65a31b6ed6a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim + +import breeze.linalg.{DenseVector => BDV} +import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} +import scala.collection.mutable + +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vectors} +import org.apache.spark.mllib.linalg.CholeskyDecomposition + +private[ml] class NormalEquationSolution( + val fitIntercept: Boolean, + private val _coefficients: Array[Double], + val aaInv: Option[DenseVector], + val objectiveHistory: Option[Array[Double]]) { + + def coefficients: DenseVector = { + if (fitIntercept) { + new DenseVector(_coefficients.slice(0, _coefficients.length - 1)) + } else { + new DenseVector(_coefficients) + } + } + + def intercept: Double = if (fitIntercept) _coefficients.last else 0.0 +} + +/** + * Interface for classes that solve the normal equations locally. + */ +private[ml] sealed trait NormalEquationSolver { + + /** Solve the normal equations from summary statistics. */ + def solve( + bBar: Double, + bbBar: Double, + abBar: DenseVector, + aaBar: DenseVector, + aBar: DenseVector): NormalEquationSolution +} + +/** + * A class that solves the normal equations directly, using Cholesky decomposition. + */ +private[ml] class CholeskySolver(val fitIntercept: Boolean) extends NormalEquationSolver { + + def solve( + bBar: Double, + bbBar: Double, + abBar: DenseVector, + aaBar: DenseVector, + aBar: DenseVector): NormalEquationSolution = { + val k = abBar.size + val x = CholeskyDecomposition.solve(aaBar.values, abBar.values) + val aaInv = CholeskyDecomposition.inverse(aaBar.values, k) + + new NormalEquationSolution(fitIntercept, x, Some(new DenseVector(aaInv)), None) + } +} + +/** + * A class for solving the normal equations using Quasi-Newton optimization methods. + */ +private[ml] class QuasiNewtonSolver( + val fitIntercept: Boolean, + maxIter: Int, + tol: Double, + l1RegFunc: Option[(Int) => Double]) extends NormalEquationSolver { + + def solve( + bBar: Double, + bbBar: Double, + abBar: DenseVector, + aaBar: DenseVector, + aBar: DenseVector): NormalEquationSolution = { + val numFeatures = aBar.size + val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + val initialCoefficientsWithIntercept = new Array[Double](numFeaturesPlusIntercept) + if (fitIntercept) { + initialCoefficientsWithIntercept(numFeaturesPlusIntercept - 1) = bBar + } + + val costFun = + new NormalEquationCostFun(bBar, bbBar, abBar, aaBar, aBar, fitIntercept, numFeatures) + val optimizer = l1RegFunc.map { func => + new BreezeOWLQN[Int, BDV[Double]](maxIter, 10, func, tol) + }.getOrElse(new BreezeLBFGS[BDV[Double]](maxIter, 10, tol)) + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + new BDV[Double](initialCoefficientsWithIntercept)) + + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + val x = state.x.toArray.clone() + new NormalEquationSolution(fitIntercept, x, None, Some(arrayBuilder.result())) + } + + /** + * NormalEquationCostFun implements Breeze's DiffFunction[T] for the normal equation. + * It returns the loss and gradient with L2 regularization at a particular point (coefficients). + * It's used in Breeze's convex optimization routines. + */ + private class NormalEquationCostFun( + bBar: Double, + bbBar: Double, + ab: DenseVector, + aa: DenseVector, + aBar: DenseVector, + fitIntercept: Boolean, + numFeatures: Int) extends DiffFunction[BDV[Double]] { + + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + + override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { + val coef = Vectors.fromBreeze(coefficients).toDense + if (fitIntercept) { + var j = 0 + var dotProd = 0.0 + val coefValues = coef.values + val aBarValues = aBar.values + while (j < numFeatures) { + dotProd += coefValues(j) * aBarValues(j) + j += 1 + } + coefValues(numFeatures) = bBar - dotProd + } + val xxb = new DenseVector(new Array[Double](numFeaturesPlusIntercept)) + BLAS.dspmv(numFeaturesPlusIntercept, 1.0, aa, coef, xxb) + // loss = 1/2 (Y^T W Y - 2 beta^T X^T W Y + beta^T X^T W X beta) + val loss = 0.5 * bbBar - BLAS.dot(ab, coef) + 0.5 * BLAS.dot(coef, xxb) + // -gradient = X^T W X beta - X^T W Y + BLAS.axpy(-1.0, ab, xxb) + (loss, xxb.asBreeze.toDenseVector) + } + } +} + +/** + * Exception thrown when solving a linear system Ax = b for which the matrix A is non-invertible + * (singular). + */ +class SingularMatrixException(message: String, cause: Throwable) + extends IllegalArgumentException(message, cause) { + + def this(message: String) = this(message, null) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 8f5f4427e1f4..4f489dfaa9f3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -20,19 +20,21 @@ package org.apache.spark.ml.optim import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -import org.apache.spark.mllib.linalg.CholeskyDecomposition import org.apache.spark.rdd.RDD /** * Model fitted by [[WeightedLeastSquares]]. + * * @param coefficients model coefficients * @param intercept model intercept * @param diagInvAtWA diagonal of matrix (A^T * W * A)^-1 + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. */ private[ml] class WeightedLeastSquaresModel( val coefficients: DenseVector, val intercept: Double, - val diagInvAtWA: DenseVector) extends Serializable { + val diagInvAtWA: DenseVector, + val objectiveHistory: Array[Double]) extends Serializable { def predict(features: Vector): Double = { BLAS.dot(coefficients, features) + intercept @@ -45,34 +47,48 @@ private[ml] class WeightedLeastSquaresModel( * formulation: * * min,,x,z,, 1/2 sum,,i,, w,,i,, (a,,i,,^T^ x + z - b,,i,,)^2^ / sum,,i,, w_i - * + 1/2 lambda / delta sum,,j,, (sigma,,j,, x,,j,,)^2^, + * + lambda / delta (1/2 (1 - alpha) sum,,j,, (sigma,,j,, x,,j,,)^2^ + * + alpha sum,,j,, abs(sigma,,j,, x,,j,,)), * - * where lambda is the regularization parameter, and delta and sigma,,j,, are controlled by - * [[standardizeLabel]] and [[standardizeFeatures]], respectively. + * where lambda is the regularization parameter, alpha is the ElasticNet mixing parameter, + * and delta and sigma,,j,, are controlled by [[standardizeLabel]] and [[standardizeFeatures]], + * respectively. * * Set [[regParam]] to 0.0 and turn off both [[standardizeFeatures]] and [[standardizeLabel]] to * match R's `lm`. * Turn on [[standardizeLabel]] to match R's `glmnet`. * * @param fitIntercept whether to fit intercept. If false, z is 0.0. - * @param regParam L2 regularization parameter (lambda) + * @param regParam L2 regularization parameter (lambda). + * @param elasticNetParam the ElasticNet mixing parameter. * @param standardizeFeatures whether to standardize features. If true, sigma_,,j,, is the * population standard deviation of the j-th column of A. Otherwise, * sigma,,j,, is 1.0. * @param standardizeLabel whether to standardize label. If true, delta is the population standard * deviation of the label column b. Otherwise, delta is 1.0. + * @param solverType the type of solver to use for optimization. + * @param maxIter maximum number of iterations when stochastic optimization is used. + * @param tol the convergence tolerance of the iterations when stochastic optimization is used. */ private[ml] class WeightedLeastSquares( val fitIntercept: Boolean, val regParam: Double, + val elasticNetParam: Double, val standardizeFeatures: Boolean, - val standardizeLabel: Boolean) extends Logging with Serializable { + val standardizeLabel: Boolean, + val solverType: WeightedLeastSquares.Solver = WeightedLeastSquares.Auto, + val maxIter: Int = 100, + val tol: Double = 1e-6) extends Logging with Serializable { import WeightedLeastSquares._ require(regParam >= 0.0, s"regParam cannot be negative: $regParam") if (regParam == 0.0) { logWarning("regParam is zero, which might cause numerical instability and overfitting.") } + require(elasticNetParam >= 0.0 && elasticNetParam <= 1.0, + s"elasticNetParam must be in [0, 1]: $elasticNetParam") + require(maxIter >= 0, s"maxIter must be a positive integer: $maxIter") + require(tol > 0, s"tol must be greater than zero: $tol") /** * Creates a [[WeightedLeastSquaresModel]] from an RDD of [[Instance]]s. @@ -85,73 +101,193 @@ private[ml] class WeightedLeastSquares( val triK = summary.triK val wSum = summary.wSum val bBar = summary.bBar - val bStd = summary.bStd + val bbBar = summary.bbBar val aBar = summary.aBar - val aVar = summary.aVar + val aStd = summary.aStd val abBar = summary.abBar val aaBar = summary.aaBar - val aaValues = aaBar.values - - if (bStd == 0) { - if (fitIntercept) { - logWarning(s"The standard deviation of the label is zero, so the coefficients will be " + - s"zeros and the intercept will be the mean of the label; as a result, " + - s"training is not needed.") - val coefficients = new DenseVector(Array.ofDim(k-1)) + val aaBarValues = aaBar.values + val numFeatures = abBar.size + val rawBStd = summary.bStd + // if b is constant (rawBStd is zero), then b cannot be scaled. In this case + // setting bStd=abs(bBar) ensures that b is not scaled anymore in l-bfgs algorithm. + val bStd = if (rawBStd == 0.0) math.abs(bBar) else rawBStd + + if (rawBStd == 0) { + if (fitIntercept || bBar == 0.0) { + if (bBar == 0.0) { + logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " + + s"and the intercept will all be zero; as a result, training is not needed.") + } else { + logWarning(s"The standard deviation of the label is zero, so the coefficients will be " + + s"zeros and the intercept will be the mean of the label; as a result, " + + s"training is not needed.") + } + val coefficients = new DenseVector(Array.ofDim(numFeatures)) val intercept = bBar val diagInvAtWA = new DenseVector(Array(0D)) - return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) + return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA, Array(0D)) + } else { + require(!(regParam > 0.0 && standardizeLabel), "The standard deviation of the label is " + + "zero. Model cannot be regularized with standardization=true") + logWarning(s"The standard deviation of the label is zero. Consider setting " + + s"fitIntercept=true.") + } + } + + val aBarStd = new Array[Double](numFeatures) + var j = 0 + while (j < numFeatures) { + if (aStd(j) == 0.0) { + aBarStd(j) = 0.0 } else { - require(!(regParam > 0.0 && standardizeLabel), - "The standard deviation of the label is zero. " + - "Model cannot be regularized with standardization=true") - logWarning(s"The standard deviation of the label is zero. " + - "Consider setting fitIntercept=true.") + aBarStd(j) = aBar(j) / aStd(j) + } + j += 1 + } + + val abBarStd = new Array[Double](numFeatures) + j = 0 + while (j < numFeatures) { + if (aStd(j) == 0.0) { + abBarStd(j) = 0.0 + } else { + abBarStd(j) = abBar(j) / (aStd(j) * bStd) + } + j += 1 + } + + val aaBarStd = new Array[Double](triK) + j = 0 + var kk = 0 + while (j < numFeatures) { + val aStdJ = aStd(j) + var i = 0 + while (i <= j) { + val aStdI = aStd(i) + if (aStdJ == 0.0 || aStdI == 0.0) { + aaBarStd(kk) = 0.0 + } else { + aaBarStd(kk) = aaBarValues(kk) / (aStdI * aStdJ) + } + kk += 1 + i += 1 } + j += 1 } + val bBarStd = bBar / bStd + val bbBarStd = bbBar / (bStd * bStd) + + val effectiveRegParam = regParam / bStd + val effectiveL1RegParam = elasticNetParam * effectiveRegParam + val effectiveL2RegParam = (1.0 - elasticNetParam) * effectiveRegParam + // add regularization to diagonals var i = 0 - var j = 2 + j = 2 while (i < triK) { - var lambda = regParam - if (standardizeFeatures) { - lambda *= aVar(j - 2) + var lambda = effectiveL2RegParam + if (!standardizeFeatures) { + val std = aStd(j - 2) + if (std != 0.0) { + lambda /= (std * std) + } else { + lambda = 0.0 + } } - if (standardizeLabel && bStd != 0) { - lambda /= bStd + if (!standardizeLabel) { + lambda *= bStd } - aaValues(i) += lambda + aaBarStd(i) += lambda i += j j += 1 } + val aa = getAtA(aaBarStd, aBarStd) + val ab = getAtB(abBarStd, bBarStd) - val aa = if (fitIntercept) { - Array.concat(aaBar.values, aBar.values, Array(1.0)) - } else { - aaBar.values - } - val ab = if (fitIntercept) { - Array.concat(abBar.values, Array(bBar)) + val solver = if ((solverType == WeightedLeastSquares.Auto && elasticNetParam != 0.0) || + (solverType == WeightedLeastSquares.QuasiNewton)) { + val effectiveL1RegFun: Option[(Int) => Double] = if (effectiveL1RegParam != 0.0) { + Some((index: Int) => { + if (fitIntercept && index == numFeatures) { + 0.0 + } else { + if (standardizeFeatures) { + effectiveL1RegParam + } else { + if (aStd(index) != 0.0) effectiveL1RegParam / aStd(index) else 0.0 + } + } + }) + } else { + None + } + new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun) } else { - abBar.values + new CholeskySolver(fitIntercept) } - val x = CholeskyDecomposition.solve(aa, ab) + val solution = solver match { + case cholesky: CholeskySolver => + try { + cholesky.solve(bBarStd, bbBarStd, ab, aa, new DenseVector(aBarStd)) + } catch { + // if Auto solver is used and Cholesky fails due to singular AtA, then fall back to + // quasi-newton solver + case _: SingularMatrixException if solverType == WeightedLeastSquares.Auto => + logWarning("Cholesky solver failed due to singular covariance matrix. " + + "Retrying with Quasi-Newton solver.") + // ab and aa were modified in place, so reconstruct them + val _aa = getAtA(aaBarStd, aBarStd) + val _ab = getAtB(abBarStd, bBarStd) + val newSolver = new QuasiNewtonSolver(fitIntercept, maxIter, tol, None) + newSolver.solve(bBarStd, bbBarStd, _ab, _aa, new DenseVector(aBarStd)) + } + case qn: QuasiNewtonSolver => + qn.solve(bBarStd, bbBarStd, ab, aa, new DenseVector(aBarStd)) + } + val intercept = solution.intercept * bStd + val coefficients = solution.coefficients - val aaInv = CholeskyDecomposition.inverse(aa, k) + // convert the coefficients from the scaled space to the original space + var ii = 0 + val coefficientArray = coefficients.toArray + val len = coefficientArray.length + while (ii < len) { + coefficientArray(ii) *= { if (aStd(ii) != 0.0) bStd / aStd(ii) else 0.0 } + ii += 1 + } // aaInv is a packed upper triangular matrix, here we get all elements on diagonal - val diagInvAtWA = new DenseVector((1 to k).map { i => - aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray) + val diagInvAtWA = solution.aaInv.map { inv => + val values = inv.values + new DenseVector((1 to k).map { i => + val multiplier = if (i == k && fitIntercept) 1.0 else aStd(i - 1) * aStd(i - 1) + values(i + (i - 1) * i / 2 - 1) / (wSum * multiplier) + }.toArray) + }.getOrElse(new DenseVector(Array(0D))) - val (coefficients, intercept) = if (fitIntercept) { - (new DenseVector(x.slice(0, x.length - 1)), x.last) + new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA, + solution.objectiveHistory.getOrElse(Array(0D))) + } + + /** Construct A^T^ A from summary statistics. */ + private def getAtA(aaBar: Array[Double], aBar: Array[Double]): DenseVector = { + if (fitIntercept) { + new DenseVector(Array.concat(aaBar, aBar, Array(1.0))) } else { - (new DenseVector(x), 0.0) + new DenseVector(aaBar.clone()) } + } - new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) + /** Construct A^T^ B from summary statistics. */ + private def getAtB(abBar: Array[Double], bBar: Double): DenseVector = { + if (fitIntercept) { + new DenseVector(Array.concat(abBar, Array(bBar))) + } else { + new DenseVector(abBar.clone()) + } } } @@ -163,6 +299,13 @@ private[ml] object WeightedLeastSquares { */ val MAX_NUM_FEATURES: Int = 4096 + sealed trait Solver + case object Auto extends Solver + case object Cholesky extends Solver + case object QuasiNewton extends Solver + + val supportedSolvers = Array(Auto, Cholesky, QuasiNewton) + /** * Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]]. */ @@ -262,6 +405,11 @@ private[ml] object WeightedLeastSquares { */ def bBar: Double = bSum / wSum + /** + * Weighted mean of squared labels. + */ + def bbBar: Double = bbSum / wSum + /** * Weighted population standard deviation of labels. */ @@ -285,6 +433,24 @@ private[ml] object WeightedLeastSquares { output } + /** + * Weighted population standard deviation of features. + */ + def aStd: DenseVector = { + val std = Array.ofDim[Double](k) + var i = 0 + var j = 2 + val aaValues = aaSum.values + while (i < triK) { + val l = j - 2 + val aw = aSum(l) / wSum + std(l) = math.sqrt(aaValues(i) / wSum - aw * aw) + i += j + j += 1 + } + new DenseVector(std) + } + /** * Weighted population variance of features. */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index bb9e150c4977..33cb25c8c7f6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -262,7 +262,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. - val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), + val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) val wlsModel = optimizer.fit(instances) val model = copyValues( @@ -337,7 +337,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine Instance(eta, instance.weight, instance.features) } // TODO: Make standardizeFeatures and standardizeLabel configurable. - val initialModel = new WeightedLeastSquares(fitIntercept, regParam, + val initialModel = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) .fit(newInstances) initialModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 025ed20c75a0..519f3bdec82d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.linalg.BLAS._ -import org.apache.spark.ml.optim.WeightedLeastSquares +import org.apache.spark.ml.optim.{NormalEquationSolver, WeightedLeastSquares} import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared._ @@ -177,6 +177,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String * If the dimensions of features or the number of partitions are large, * this param could be adjusted to a larger size. * Default is 2. + * * @group expertSetParam */ @Since("2.1.0") @@ -194,21 +195,18 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String Instance(label, weight, features) } - if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && + if (($(solver) == "auto" && numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") { - require($(elasticNetParam) == 0.0, "Only L2 regularization can be used when normal " + - "solver is used.'") - // For low dimensional data, WeightedLeastSquares is more efficiently since the + // For low dimensional data, WeightedLeastSquares is more efficient since the // training algorithm only requires one pass through the data. (SPARK-10668) val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), - $(standardization), true) + elasticNetParam = $(elasticNetParam), $(standardization), true, + solverType = WeightedLeastSquares.Auto, maxIter = $(maxIter), tol = $(tol)) val model = optimizer.fit(instances) // When it is trained by WeightedLeastSquares, training summary does not - // attached returned model. + // attach returned model. val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept)) - // WeightedLeastSquares does not run through iterations. So it does not generate - // an objective history. val (summaryModel, predictionColName) = lrModel.findSummaryModelAndPredictionCol() val trainingSummary = new LinearRegressionTrainingSummary( summaryModel.transform(dataset), @@ -217,7 +215,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String $(featuresCol), summaryModel, model.diagInvAtWA.toArray, - Array(0D)) + model.objectiveHistory) return lrModel.setSummary(trainingSummary) } @@ -243,7 +241,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val yMean = ySummarizer.mean(0) val rawYStd = math.sqrt(ySummarizer.variance(0)) if (rawYStd == 0.0) { - if ($(fitIntercept) || yMean==0.0) { + if ($(fitIntercept) || yMean == 0.0) { // If the rawYStd is zero and fitIntercept=true, then the intercept is yMean with // zero coefficient; as a result, training is not needed. // Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala index 08f8f19c1e77..68771f1afbe8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala @@ -20,6 +20,8 @@ package org.apache.spark.mllib.linalg import com.github.fommil.netlib.LAPACK.{getInstance => lapack} import org.netlib.util.intW +import org.apache.spark.ml.optim.SingularMatrixException + /** * Compute Cholesky decomposition. */ @@ -60,7 +62,7 @@ private[spark] object CholeskyDecomposition { case code if code < 0 => throw new IllegalStateException(s"LAPACK.$method returned $code; arg ${-code} is illegal") case code if code > 0 => - throw new IllegalArgumentException( + throw new SingularMatrixException ( s"LAPACK.$method returned $code because A is not positive definite. Is A derived from " + "a singular matrix (e.g. collinear column values)?") case _ => // do nothing diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala index b30d995794d4..50260952ecb6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala @@ -85,7 +85,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes val eta = math.log(mu / (1.0 - mu)) Instance(eta, instance.weight, instance.features) } - val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, + val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, standardizeFeatures = false, standardizeLabel = false).fit(newInstances) val irls = new IterativelyReweightedLeastSquares(initial, BinomialReweightFunc, fitIntercept, regParam = 0.0, maxIter = 25, tol = 1e-8).fit(instances1) @@ -122,7 +122,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes val eta = math.log(mu) Instance(eta, instance.weight, instance.features) } - val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, + val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, standardizeFeatures = false, standardizeLabel = false).fit(newInstances) val irls = new IterativelyReweightedLeastSquares(initial, PoissonReweightFunc, fitIntercept, regParam = 0.0, maxIter = 25, tol = 1e-8).fit(instances2) @@ -155,7 +155,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes var idx = 0 for (fitIntercept <- Seq(false, true)) { - val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, + val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, standardizeFeatures = false, standardizeLabel = false).fit(instances2) val irls = new IterativelyReweightedLeastSquares(initial, L1RegressionReweightFunc, fitIntercept, regParam = 0.0, maxIter = 200, tol = 1e-7).fit(instances2) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 2cb1af0dee0b..3bf659bacdbc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.optim import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{BLAS, Vectors} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD @@ -60,7 +60,56 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext ), 2) } - test("two collinear features result in error with no regularization") { + test("WLS with strong L1 regularization") { + /* + We initialize the coefficients for WLS QN solver to be weighted average of the label. Check + here that with only an intercept the model converges to bBar. + */ + val bAgg = instances.collect().foldLeft((0.0, 0.0)) { case ((sum, count), Instance(l, w, f)) => + (sum + w * l, count + w) + } + val bBar = bAgg._1 / bAgg._2 + val wls = new WeightedLeastSquares(true, 10, 1.0, true, true) + val model = wls.fit(instances) + assert(model.intercept ~== bBar relTol 1e-6) + } + + test("diagonal inverse of AtWA") { + /* + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + w <- c(1, 2, 3, 4) + W <- Diagonal(length(w), w) + A.intercept <- cbind(A, rep.int(1, length(w))) + AtA.intercept <- t(A.intercept) %*% W %*% A.intercept + inv.intercept <- solve(AtA.intercept) + print(diag(inv.intercept)) + [1] 4.02 0.50 12.02 + + AtA <- t(A) %*% W %*% A + inv <- solve(AtA) + print(diag(inv)) + [1] 0.48336106 0.02079867 + + */ + val expectedWithIntercept = Vectors.dense(4.02, 0.50, 12.02) + val expected = Vectors.dense(0.48336106, 0.02079867) + val wlsWithIntercept = new WeightedLeastSquares(true, 0.0, 0.0, true, true, + solverType = WeightedLeastSquares.Cholesky) + val wlsModelWithIntercept = wlsWithIntercept.fit(instances) + val wls = new WeightedLeastSquares(false, 0.0, 0.0, true, true, + solverType = WeightedLeastSquares.Cholesky) + val wlsModel = wls.fit(instances) + + assert(expectedWithIntercept ~== wlsModelWithIntercept.diagInvAtWA relTol 1e-4) + assert(expected ~== wlsModel.diagInvAtWA relTol 1e-4) + } + + test("two collinear features") { + /* + A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) + b <- c(1, 2, 3, 4) + w <- c(1, 1, 1, 1) + */ val singularInstances = sc.parallelize(Seq( Instance(1.0, 1.0, Vectors.dense(1.0, 2.0)), Instance(2.0, 1.0, Vectors.dense(2.0, 4.0)), @@ -68,16 +117,30 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext Instance(4.0, 1.0, Vectors.dense(4.0, 8.0)) ), 2) - intercept[IllegalArgumentException] { - new WeightedLeastSquares( - false, regParam = 0.0, standardizeFeatures = false, - standardizeLabel = false).fit(singularInstances) + // Cholesky solver does not handle singular input + intercept[SingularMatrixException] { + new WeightedLeastSquares(false, 0.0, 0.0, false, false, + solverType = WeightedLeastSquares.Cholesky).fit(singularInstances) } - // Should not throw an exception - new WeightedLeastSquares( - false, regParam = 1.0, standardizeFeatures = false, - standardizeLabel = false).fit(singularInstances) + // Cholesky should not throw an exception since regularization is applied + new WeightedLeastSquares(false, 1.0, 0.0, false, false, + solverType = WeightedLeastSquares.Cholesky).fit(singularInstances) + + // quasi-newton solvers should handle singular input and make correct predictions + // auto solver should try Cholesky first, then fall back to QN + for (fitIntercept <- Seq(false, true); + standardization <- Seq(false, true); + solver <- Seq(WeightedLeastSquares.Auto, WeightedLeastSquares.QuasiNewton)) { + val singularModel = new WeightedLeastSquares(fitIntercept, regParam = 0.0, + elasticNetParam = 0.0, standardizeFeatures = standardization, + standardizeLabel = standardization, solverType = solver).fit(singularInstances) + + singularInstances.collect().foreach { case Instance(l, w, f) => + val pred = BLAS.dot(singularModel.coefficients, f) + singularModel.intercept + assert(pred ~== l absTol 1e-6) + } + } } test("WLS against lm") { @@ -100,13 +163,15 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext var idx = 0 for (fitIntercept <- Seq(false, true)) { - for (standardization <- Seq(false, true)) { - val wls = new WeightedLeastSquares( - fitIntercept, regParam = 0.0, standardizeFeatures = standardization, - standardizeLabel = standardization).fit(instances) - val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) - } + for (standardization <- Seq(false, true)) { + for (solver <- WeightedLeastSquares.supportedSolvers) { + val wls = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, + standardizeFeatures = standardization, standardizeLabel = standardization, + solverType = solver).fit(instances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + } + } idx += 1 } } @@ -132,24 +197,234 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext var idx = 0 for (fitIntercept <- Seq(false, true)) { for (standardization <- Seq(false, true)) { - val wls = new WeightedLeastSquares( - fitIntercept, regParam = 0.0, standardizeFeatures = standardization, - standardizeLabel = standardization).fit(instancesConstLabel) - val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + for (solver <- WeightedLeastSquares.supportedSolvers) { + val wls = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, + standardizeFeatures = standardization, + standardizeLabel = standardization, solverType = solver).fit(instancesConstLabel) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + } } idx += 1 } + + // when label is constant zero, and fitIntercept is false, we should not train and get all zeros + val instancesConstZeroLabel = instancesConstLabel.map { case Instance(l, w, f) => + Instance(0.0, w, f) + } + for (solver <- WeightedLeastSquares.supportedSolvers) { + val wls = new WeightedLeastSquares(false, 0.0, 0.0, true, true, solverType = solver) + .fit(instancesConstZeroLabel) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual === Vectors.dense(0.0, 0.0, 0.0)) + assert(wls.objectiveHistory === Array(0.0)) + } } test("WLS with regularization when label is constant") { // if regParam is non-zero and standardization is true, the problem is ill-defined and // an exception is thrown. - val wls = new WeightedLeastSquares( - fitIntercept = false, regParam = 0.1, standardizeFeatures = true, - standardizeLabel = true) - intercept[IllegalArgumentException]{ - wls.fit(instancesConstLabel) + for (solver <- WeightedLeastSquares.supportedSolvers) { + val wls = new WeightedLeastSquares( + fitIntercept = false, regParam = 0.1, elasticNetParam = 0.0, standardizeFeatures = true, + standardizeLabel = true, solverType = solver) + intercept[IllegalArgumentException]{ + wls.fit(instancesConstLabel) + } + } + } + + test("WLS against glmnet with constant features") { + /* + R code: + + A <- matrix(c(1, 1, 1, 1, 5, 7, 11, 13), 4, 2) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + */ + val constantFeatures = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(1.0, 5.0)), + Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(1.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(1.0, 13.0)) + ), 2) + + // Cholesky solver does not handle singular input with no regularization + for (fitIntercept <- Seq(false, true); + standardization <- Seq(false, true)) { + val wls = new WeightedLeastSquares(fitIntercept, 0.0, 0.0, standardization, standardization, + solverType = WeightedLeastSquares.Cholesky) + // for the case of no intercept, this would not have failed before but since we train + // in the standardized space now, it will fail + intercept[SingularMatrixException] { + wls.fit(constantFeatures) + } + } + + // should not fail when regularization is added + new WeightedLeastSquares(true, 0.5, 0.0, standardizeFeatures = true, + standardizeLabel = true, solverType = WeightedLeastSquares.Cholesky).fit(constantFeatures) + + /* + for (intercept in c(FALSE, TRUE)) { + for (standardize in c(FALSE, TRUE)) { + for (regParams in list(c(0.0, 0.0), c(0.5, 0.0), c(0.5, 0.5), c(0.5, 1.0))) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=regParams[1], + standardize=standardize, alpha=regParams[2], thresh=1E-14) + print(as.vector(coef(model))) + } + } + } + [1] 0.000000 0.000000 2.253012 + [1] 0.000000 0.000000 2.250857 + [1] 0.000000 0.000000 2.249784 + [1] 0.000000 0.000000 2.248709 + [1] 0.000000 0.000000 2.253012 + [1] 0.000000 0.000000 2.235802 + [1] 0.000000 0.000000 2.238297 + [1] 0.000000 0.000000 2.240811 + [1] 8.218905 0.000000 1.517413 + [1] 8.434286 0.000000 1.496703 + [1] 8.648497 0.000000 1.476106 + [1] 8.865672 0.000000 1.455224 + [1] 8.218905 0.000000 1.517413 + [1] 9.798771 0.000000 1.365503 + [1] 9.919095 0.000000 1.353933 + [1] 10.052804 0.000000 1.341077 + */ + val expectedQuasiNewton = Seq( + Vectors.dense(0.000000, 0.000000, 2.253012), + Vectors.dense(0.000000, 0.000000, 2.250857), + Vectors.dense(0.000000, 0.000000, 2.249784), + Vectors.dense(0.000000, 0.000000, 2.248709), + Vectors.dense(0.000000, 0.000000, 2.253012), + Vectors.dense(0.000000, 0.000000, 2.235802), + Vectors.dense(0.000000, 0.000000, 2.238297), + Vectors.dense(0.000000, 0.000000, 2.240811), + Vectors.dense(8.218905, 0.000000, 1.517413), + Vectors.dense(8.434286, 0.000000, 1.496703), + Vectors.dense(8.648497, 0.000000, 1.476106), + Vectors.dense(8.865672, 0.000000, 1.455224), + Vectors.dense(8.218905, 0.000000, 1.517413), + Vectors.dense(9.798771, 0.000000, 1.365503), + Vectors.dense(9.919095, 0.000000, 1.353933), + Vectors.dense(10.052804, 0.000000, 1.341077)) + var idx = 0 + for (fitIntercept <- Seq(false, true); + standardization <- Seq(false, true); + (lambda, alpha) <- Seq((0.0, 0.0), (0.5, 0.0), (0.5, 0.5), (0.5, 1.0))) { + val wls = new WeightedLeastSquares(fitIntercept, regParam = lambda, elasticNetParam = alpha, + standardizeFeatures = standardization, standardizeLabel = true, + solverType = WeightedLeastSquares.QuasiNewton) + val model = wls.fit(constantFeatures) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expectedQuasiNewton(idx) absTol 1e-6) + idx += 1 + } + } + + test("WLS against glmnet with L1 regularization") { + /* + for (intercept in c(FALSE, TRUE)) { + for (lambda in c(0.1, 0.5, 1.0)) { + for (standardize in c(FALSE, TRUE)) { + for (alpha in c(0.1, 0.5, 1.0)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=lambda, + standardize=standardize, alpha=alpha, thresh=1E-14) + print(as.vector(coef(model))) + } + } + } + } + [1] 0.000000 -3.292821 2.921188 + [1] 0.000000 -3.230854 2.908484 + [1] 0.000000 -3.145586 2.891014 + [1] 0.000000 -2.919246 2.841724 + [1] 0.000000 -2.938323 2.846369 + [1] 0.000000 -2.965397 2.852838 + [1] 0.000000 -2.137858 2.684464 + [1] 0.000000 -1.680094 2.590844 + [1] 0.0000000 -0.8194631 2.4151405 + [1] 0.0000000 -0.9608375 2.4301013 + [1] 0.0000000 -0.6187922 2.3634907 + [1] 0.000000 0.000000 2.240811 + [1] 0.000000 -1.346573 2.521293 + [1] 0.0000000 -0.3680456 2.3212362 + [1] 0.000000 0.000000 2.244406 + [1] 0.000000 0.000000 2.219816 + [1] 0.000000 0.000000 2.223694 + [1] 0.00000 0.00000 2.22861 + [1] 13.5631592 3.2811513 0.3725517 + [1] 13.6953934 3.3336271 0.3497454 + [1] 13.9600276 3.4600170 0.2999941 + [1] 14.2389889 3.6589920 0.2349065 + [1] 15.2374080 4.2119643 0.0325638 + [1] 15.4 4.3 0.0 + [1] 10.442365 1.246065 1.063991 + [1] 8.9580718 0.1938471 1.4090610 + [1] 8.865672 0.000000 1.455224 + [1] 13.0430927 2.4927151 0.5741805 + [1] 13.814429 2.722027 0.455915 + [1] 16.2 3.9 0.0 + [1] 9.8904768 0.7574694 1.2110177 + [1] 9.072226 0.000000 1.435363 + [1] 9.512438 0.000000 1.393035 + [1] 13.3677796 2.1721216 0.6046132 + [1] 14.2554457 2.2285185 0.5084151 + [1] 17.2 3.4 0.0 + */ + + val expected = Seq( + Vectors.dense(0, -3.2928206726474, 2.92118822588649), + Vectors.dense(0, -3.23085414359003, 2.90848366035008), + Vectors.dense(0, -3.14558628299477, 2.89101408157209), + Vectors.dense(0, -2.91924558816421, 2.84172398097327), + Vectors.dense(0, -2.93832343383477, 2.84636891947663), + Vectors.dense(0, -2.96539689593024, 2.85283836322185), + Vectors.dense(0, -2.13785756976542, 2.68446351346705), + Vectors.dense(0, -1.68009377560774, 2.59084422793154), + Vectors.dense(0, -0.819463123385533, 2.41514053108346), + Vectors.dense(0, -0.960837488151064, 2.43010130999756), + Vectors.dense(0, -0.618792151647599, 2.36349074148962), + Vectors.dense(0, 0, 2.24081114726441), + Vectors.dense(0, -1.34657309253953, 2.52129296638512), + Vectors.dense(0, -0.368045602821844, 2.32123616258871), + Vectors.dense(0, 0, 2.24440619621343), + Vectors.dense(0, 0, 2.21981559944924), + Vectors.dense(0, 0, 2.22369447413621), + Vectors.dense(0, 0, 2.22861024633605), + Vectors.dense(13.5631591827557, 3.28115132060568, 0.372551747695477), + Vectors.dense(13.6953934007661, 3.3336271417751, 0.349745414969587), + Vectors.dense(13.960027608754, 3.46001702257532, 0.29999407173994), + Vectors.dense(14.2389889013085, 3.65899196445023, 0.234906458633754), + Vectors.dense(15.2374079667397, 4.21196428071551, 0.0325637953681963), + Vectors.dense(15.4, 4.3, 0), + Vectors.dense(10.4423647474653, 1.24606545153166, 1.06399080283378), + Vectors.dense(8.95807177856822, 0.193847088148233, 1.4090609658784), + Vectors.dense(8.86567164179104, 0, 1.45522388059702), + Vectors.dense(13.0430927453034, 2.49271514356687, 0.574180477650271), + Vectors.dense(13.8144287399675, 2.72202744354555, 0.455915035859752), + Vectors.dense(16.2, 3.9, 0), + Vectors.dense(9.89047681835741, 0.757469417613661, 1.21101772561685), + Vectors.dense(9.07222551185964, 0, 1.43536293155196), + Vectors.dense(9.51243781094527, 0, 1.39303482587065), + Vectors.dense(13.3677796362763, 2.17212164262107, 0.604613180623227), + Vectors.dense(14.2554457236073, 2.22851848830683, 0.508415124978748), + Vectors.dense(17.2, 3.4, 0) + ) + + var idx = 0 + for (fitIntercept <- Seq(false, true); + regParam <- Seq(0.1, 0.5, 1.0); + standardizeFeatures <- Seq(false, true); + elasticNetParam <- Seq(0.1, 0.5, 1.0)) { + val wls = new WeightedLeastSquares( + fitIntercept, regParam, elasticNetParam = elasticNetParam, standardizeFeatures, + standardizeLabel = true) + .fit(instances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + idx += 1 } } @@ -201,11 +476,13 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext for (fitIntercept <- Seq(false, true); regParam <- Seq(0.0, 0.1, 1.0); standardizeFeatures <- Seq(false, true)) { - val wls = new WeightedLeastSquares( - fitIntercept, regParam, standardizeFeatures, standardizeLabel = true) - .fit(instances) - val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + for (solver <- WeightedLeastSquares.supportedSolvers) { + val wls = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0, + standardizeFeatures, standardizeLabel = true, solverType = solver) + .fit(instances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + } idx += 1 } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 1c94ec67d79d..1098adcb5c2f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -57,7 +57,7 @@ class LinearRegressionSuite xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF() val r = new Random(seed) - // When feature size is larger than 4096, normal optimizer is choosed + // When feature size is larger than 4096, normal optimizer is chosen // as the solver of linear regression in the case of "auto" mode. val featureSize = 4100 datasetWithSparseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput( @@ -155,6 +155,42 @@ class LinearRegressionSuite assert(model.numFeatures === numFeatures) } + test("linear regression handles singular matrices") { + // check for both constant columns with intercept (zero std) and collinear + val singularDataConstantColumn = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(1.0, 5.0).toSparse), + Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(1.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(1.0, 13.0)) + ), 2).toDF() + + Seq("auto", "l-bfgs", "normal").foreach { solver => + val trainer = new LinearRegression().setSolver(solver).setFitIntercept(true) + val model = trainer.fit(singularDataConstantColumn) + // to make it clear that WLS did not solve analytically + intercept[UnsupportedOperationException] { + model.summary.coefficientStandardErrors + } + assert(model.summary.objectiveHistory !== Array(0.0)) + } + + val singularDataCollinearFeatures = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(10.0, 5.0).toSparse), + Instance(19.0, 2.0, Vectors.dense(14.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(22.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(26.0, 13.0)) + ), 2).toDF() + + Seq("auto", "l-bfgs", "normal").foreach { solver => + val trainer = new LinearRegression().setSolver(solver).setFitIntercept(true) + val model = trainer.fit(singularDataCollinearFeatures) + intercept[UnsupportedOperationException] { + model.summary.coefficientStandardErrors + } + assert(model.summary.objectiveHistory !== Array(0.0)) + } + } + test("linear regression with intercept without regularization") { Seq("auto", "l-bfgs", "normal").foreach { solver => val trainer1 = new LinearRegression().setSolver(solver) @@ -233,12 +269,12 @@ class LinearRegressionSuite as.numeric.data3.V2. 4.70011 as.numeric.data3.V3. 7.19943 */ - val coefficientsWithourInterceptR = Vectors.dense(4.70011, 7.19943) + val coefficientsWithoutInterceptR = Vectors.dense(4.70011, 7.19943) assert(modelWithoutIntercept1.intercept ~== 0 absTol 1E-3) - assert(modelWithoutIntercept1.coefficients ~= coefficientsWithourInterceptR relTol 1E-3) + assert(modelWithoutIntercept1.coefficients ~= coefficientsWithoutInterceptR relTol 1E-3) assert(modelWithoutIntercept2.intercept ~== 0 absTol 1E-3) - assert(modelWithoutIntercept2.coefficients ~= coefficientsWithourInterceptR relTol 1E-3) + assert(modelWithoutIntercept2.coefficients ~= coefficientsWithoutInterceptR relTol 1E-3) } } @@ -249,55 +285,47 @@ class LinearRegressionSuite val trainer2 = (new LinearRegression).setElasticNetParam(1.0).setRegParam(0.57) .setSolver(solver).setStandardization(false) - // Normal optimizer is not supported with only L1 regularization case. - if (solver == "normal") { - intercept[IllegalArgumentException] { - trainer1.fit(datasetWithDenseFeature) - trainer2.fit(datasetWithDenseFeature) - } - } else { - val model1 = trainer1.fit(datasetWithDenseFeature) - val model2 = trainer2.fit(datasetWithDenseFeature) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", - alpha = 1.0, lambda = 0.57 )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) 6.242284 - as.numeric.d1.V2. 4.019605 - as.numeric.d1.V3. 6.679538 - */ - val interceptR1 = 6.242284 - val coefficientsR1 = Vectors.dense(4.019605, 6.679538) - assert(model1.intercept ~== interceptR1 relTol 1E-2) - assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, - lambda = 0.57, standardize=FALSE )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) 6.416948 - as.numeric.data.V2. 3.893869 - as.numeric.data.V3. 6.724286 - */ - val interceptR2 = 6.416948 - val coefficientsR2 = Vectors.dense(3.893869, 6.724286) - - assert(model2.intercept ~== interceptR2 relTol 1E-3) - assert(model2.coefficients ~= coefficientsR2 relTol 1E-3) - - model1.transform(datasetWithDenseFeature).select("features", "prediction") - .collect().foreach { - case Row(features: DenseVector, prediction1: Double) => - val prediction2 = - features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + - model1.intercept - assert(prediction1 ~== prediction2 relTol 1E-5) - } + val model1 = trainer1.fit(datasetWithDenseFeature) + val model2 = trainer2.fit(datasetWithDenseFeature) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", + alpha = 1.0, lambda = 0.57 )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.242284 + as.numeric.d1.V2. 4.019605 + as.numeric.d1.V3. 6.679538 + */ + val interceptR1 = 6.242284 + val coefficientsR1 = Vectors.dense(4.019605, 6.679538) + assert(model1.intercept ~== interceptR1 relTol 1E-2) + assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, + lambda = 0.57, standardize=FALSE )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.416948 + as.numeric.data.V2. 3.893869 + as.numeric.data.V3. 6.724286 + */ + val interceptR2 = 6.416948 + val coefficientsR2 = Vectors.dense(3.893869, 6.724286) + + assert(model2.intercept ~== interceptR2 relTol 1E-3) + assert(model2.coefficients ~= coefficientsR2 relTol 1E-3) + + model1.transform(datasetWithDenseFeature).select("features", "prediction") + .collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) } } } @@ -309,56 +337,48 @@ class LinearRegressionSuite val trainer2 = (new LinearRegression).setElasticNetParam(1.0).setRegParam(0.57) .setFitIntercept(false).setStandardization(false).setSolver(solver) - // Normal optimizer is not supported with only L1 regularization case. - if (solver == "normal") { - intercept[IllegalArgumentException] { - trainer1.fit(datasetWithDenseFeature) - trainer2.fit(datasetWithDenseFeature) - } - } else { - val model1 = trainer1.fit(datasetWithDenseFeature) - val model2 = trainer2.fit(datasetWithDenseFeature) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, - lambda = 0.57, intercept=FALSE )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) . - as.numeric.data.V2. 6.272927 - as.numeric.data.V3. 4.782604 - */ - val interceptR1 = 0.0 - val coefficientsR1 = Vectors.dense(6.272927, 4.782604) - - assert(model1.intercept ~== interceptR1 absTol 1E-2) - assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, - lambda = 0.57, intercept=FALSE, standardize=FALSE )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) . - as.numeric.data.V2. 6.207817 - as.numeric.data.V3. 4.775780 - */ - val interceptR2 = 0.0 - val coefficientsR2 = Vectors.dense(6.207817, 4.775780) - - assert(model2.intercept ~== interceptR2 absTol 1E-2) - assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) - - model1.transform(datasetWithDenseFeature).select("features", "prediction") - .collect().foreach { - case Row(features: DenseVector, prediction1: Double) => - val prediction2 = - features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + - model1.intercept - assert(prediction1 ~== prediction2 relTol 1E-5) - } + val model1 = trainer1.fit(datasetWithDenseFeature) + val model2 = trainer2.fit(datasetWithDenseFeature) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, + lambda = 0.57, intercept=FALSE )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.272927 + as.numeric.data.V3. 4.782604 + */ + val interceptR1 = 0.0 + val coefficientsR1 = Vectors.dense(6.272927, 4.782604) + + assert(model1.intercept ~== interceptR1 absTol 1E-2) + assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 1.0, + lambda = 0.57, intercept=FALSE, standardize=FALSE )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.data.V2. 6.207817 + as.numeric.data.V3. 4.775780 + */ + val interceptR2 = 0.0 + val coefficientsR2 = Vectors.dense(6.207817, 4.775780) + + assert(model2.intercept ~== interceptR2 absTol 1E-2) + assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) + + model1.transform(datasetWithDenseFeature).select("features", "prediction") + .collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) } } } @@ -471,56 +491,48 @@ class LinearRegressionSuite val trainer2 = (new LinearRegression).setElasticNetParam(0.3).setRegParam(1.6) .setStandardization(false).setSolver(solver) - // Normal optimizer is not supported with non-zero elasticnet parameter. - if (solver == "normal") { - intercept[IllegalArgumentException] { - trainer1.fit(datasetWithDenseFeature) - trainer2.fit(datasetWithDenseFeature) - } - } else { - val model1 = trainer1.fit(datasetWithDenseFeature) - val model2 = trainer2.fit(datasetWithDenseFeature) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, - lambda = 1.6 )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) 5.689855 - as.numeric.d1.V2. 3.661181 - as.numeric.d1.V3. 6.000274 - */ - val interceptR1 = 5.689855 - val coefficientsR1 = Vectors.dense(3.661181, 6.000274) - - assert(model1.intercept ~== interceptR1 relTol 1E-2) - assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6 - standardize=FALSE)) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) 6.113890 - as.numeric.d1.V2. 3.407021 - as.numeric.d1.V3. 6.152512 - */ - val interceptR2 = 6.113890 - val coefficientsR2 = Vectors.dense(3.407021, 6.152512) - - assert(model2.intercept ~== interceptR2 relTol 1E-2) - assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) - - model1.transform(datasetWithDenseFeature).select("features", "prediction") - .collect().foreach { - case Row(features: DenseVector, prediction1: Double) => - val prediction2 = - features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + - model1.intercept - assert(prediction1 ~== prediction2 relTol 1E-5) - } + val model1 = trainer1.fit(datasetWithDenseFeature) + val model2 = trainer2.fit(datasetWithDenseFeature) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, + lambda = 1.6 )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 5.689855 + as.numeric.d1.V2. 3.661181 + as.numeric.d1.V3. 6.000274 + */ + val interceptR1 = 5.689855 + val coefficientsR1 = Vectors.dense(3.661181, 6.000274) + + assert(model1.intercept ~== interceptR1 relTol 1E-2) + assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, lambda = 1.6 + standardize=FALSE)) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) 6.113890 + as.numeric.d1.V2. 3.407021 + as.numeric.d1.V3. 6.152512 + */ + val interceptR2 = 6.113890 + val coefficientsR2 = Vectors.dense(3.407021, 6.152512) + + assert(model2.intercept ~== interceptR2 relTol 1E-2) + assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) + + model1.transform(datasetWithDenseFeature).select("features", "prediction") + .collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) } } } @@ -532,57 +544,49 @@ class LinearRegressionSuite val trainer2 = (new LinearRegression).setElasticNetParam(0.3).setRegParam(1.6) .setFitIntercept(false).setStandardization(false).setSolver(solver) - // Normal optimizer is not supported with non-zero elasticnet parameter. - if (solver == "normal") { - intercept[IllegalArgumentException] { - trainer1.fit(datasetWithDenseFeature) - trainer2.fit(datasetWithDenseFeature) - } - } else { - val model1 = trainer1.fit(datasetWithDenseFeature) - val model2 = trainer2.fit(datasetWithDenseFeature) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, - lambda = 1.6, intercept=FALSE )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) . - as.numeric.d1.V2. 5.643748 - as.numeric.d1.V3. 4.331519 - */ - val interceptR1 = 0.0 - val coefficientsR1 = Vectors.dense(5.643748, 4.331519) - - assert(model1.intercept ~== interceptR1 absTol 1E-2) - assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) - - /* - coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, - lambda = 1.6, intercept=FALSE, standardize=FALSE )) - > coefficients - 3 x 1 sparse Matrix of class "dgCMatrix" - s0 - (Intercept) . - as.numeric.d1.V2. 5.455902 - as.numeric.d1.V3. 4.312266 - - */ - val interceptR2 = 0.0 - val coefficientsR2 = Vectors.dense(5.455902, 4.312266) - - assert(model2.intercept ~== interceptR2 absTol 1E-2) - assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) - - model1.transform(datasetWithDenseFeature).select("features", "prediction") - .collect().foreach { - case Row(features: DenseVector, prediction1: Double) => - val prediction2 = - features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + - model1.intercept - assert(prediction1 ~== prediction2 relTol 1E-5) - } + val model1 = trainer1.fit(datasetWithDenseFeature) + val model2 = trainer2.fit(datasetWithDenseFeature) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, + lambda = 1.6, intercept=FALSE )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.d1.V2. 5.643748 + as.numeric.d1.V3. 4.331519 + */ + val interceptR1 = 0.0 + val coefficientsR1 = Vectors.dense(5.643748, 4.331519) + + assert(model1.intercept ~== interceptR1 absTol 1E-2) + assert(model1.coefficients ~= coefficientsR1 relTol 1E-2) + + /* + coefficients <- coef(glmnet(features, label, family="gaussian", alpha = 0.3, + lambda = 1.6, intercept=FALSE, standardize=FALSE )) + > coefficients + 3 x 1 sparse Matrix of class "dgCMatrix" + s0 + (Intercept) . + as.numeric.d1.V2. 5.455902 + as.numeric.d1.V3. 4.312266 + + */ + val interceptR2 = 0.0 + val coefficientsR2 = Vectors.dense(5.455902, 4.312266) + + assert(model2.intercept ~== interceptR2 absTol 1E-2) + assert(model2.coefficients ~= coefficientsR2 relTol 1E-2) + + model1.transform(datasetWithDenseFeature).select("features", "prediction") + .collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val prediction2 = + features(0) * model1.coefficients(0) + features(1) * model1.coefficients(1) + + model1.intercept + assert(prediction1 ~== prediction2 relTol 1E-5) } } } @@ -757,8 +761,9 @@ class LinearRegressionSuite assert(model.summary.meanAbsoluteError ~== 0.07961668 relTol 1E-4) assert(model.summary.r2 ~== 0.9998737 relTol 1E-4) - // Normal solver uses "WeightedLeastSquares". This algorithm does not generate - // objective history because it does not run through iterations. + // Normal solver uses "WeightedLeastSquares". When no regularization is applied, + // this algorithm uses a direct solver and does not generate an objective history because + // it does not run through iterations. if (solver == "l-bfgs") { // Objective function should be monotonically decreasing for linear regression assert( @@ -776,7 +781,7 @@ class LinearRegressionSuite val pValsR = Array(0, 0, 0) model.summary.devianceResiduals.zip(devianceResidualsR).foreach { x => assert(x._1 ~== x._2 absTol 1E-4) } - model.summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => + model.summary.coefficientStandardErrors.zip(seCoefR).foreach { x => assert(x._1 ~== x._2 absTol 1E-4) } model.summary.tValues.map(_.round).zip(tValsR).foreach{ x => assert(x._1 === x._2) } model.summary.pValues.map(_.round).zip(pValsR).foreach{ x => assert(x._1 === x._2) } @@ -950,6 +955,20 @@ class LinearRegressionSuite assert(x._1 ~== x._2 absTol 1E-3) } model.summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } model.summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + + val modelWithL1 = new LinearRegression() + .setWeightCol("weight") + .setSolver("normal") + .setRegParam(0.5) + .setElasticNetParam(1.0) + .fit(datasetWithWeight) + + assert(modelWithL1.summary.objectiveHistory !== Array(0.0)) + assert( + modelWithL1.summary + .objectiveHistory + .sliding(2) + .forall(x => x(0) >= x(1))) } test("linear regression summary with weighted samples and w/o intercept by normal solver") { From 4f4e32dbe60f44520d49ab75f55f6e4e1db1902a Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 12 Oct 2016 13:48:40 -0700 Subject: [PATCH 2/6] update std arrays in place in WLS, other review comments --- .../org/apache/spark/ml/linalg/BLAS.scala | 16 +- .../apache/spark/ml/linalg/BLASSuite.scala | 24 ++- .../spark/ml/optim/NormalEquationSolver.scala | 24 +-- .../spark/ml/optim/WeightedLeastSquares.scala | 83 +++++------ .../ml/optim/WeightedLeastSquaresSuite.scala | 137 ++++++++++-------- .../ml/regression/LinearRegressionSuite.scala | 6 +- 6 files changed, 167 insertions(+), 123 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 2b30decb0a9b..4b5a0e09db91 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -244,12 +244,20 @@ private[spark] object BLAS extends Serializable { } /** - * y += alpha * A * x + * y := alpha*A*x + beta*y * - * @param A The upper triangular part of A in a [[DenseVector]] (column major) + * @param A The upper triangular part of A in a [[DenseVector]] (column major). + * @param x The [[DenseVector]] transformed by A. + * @param y The [[DenseVector]] to be modified in place. */ - def dspmv(n: Int, alpha: Double, A: DenseVector, x: DenseVector, y: DenseVector): Unit = { - f2jBLAS.dspmv("U", n, alpha, A.values, x.values, 1, 1.0, y.values, 1) + def dspmv( + n: Int, + alpha: Double, + A: DenseVector, + x: DenseVector, + beta: Double, + y: DenseVector): Unit = { + f2jBLAS.dspmv("U", n, alpha, A.values, x.values, 1, beta, y.values, 1) } /** diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index 1b02eee5e65d..877ac6898334 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -429,7 +429,7 @@ class BLASSuite extends SparkMLFunSuite { [-2.0, -8.0, 4.0, 7.0], [2.0, 4.0, -3.0, -3.0], [-4.0, 7.0, -3.0, 0.0]] - x = [5.0, 2.0, 1.0, 9.0] + x = [5.0, 2.0, -1.0, -9.0] Ax = [ 45., -93., 48., -3.] */ val A = new DenseVector(Array(3.0, -2.0, -8.0, 2.0, 4.0, -3.0, -4.0, 7.0, -3.0, 0.0)) @@ -440,19 +440,31 @@ class BLASSuite extends SparkMLFunSuite { val y2 = y1.copy val y3 = y1.copy val y4 = y1.copy + val y5 = y1.copy + val y6 = y1.copy + val y7 = y1.copy val expected1 = new DenseVector(Array(42.0, -87.0, 40.0, -6.0)) val expected2 = new DenseVector(Array(19.5, -40.5, 16.0, -4.5)) val expected3 = new DenseVector(Array(-25.5, 52.5, -32.0, -1.5)) val expected4 = new DenseVector(Array(-3.0, 6.0, -8.0, -3.0)) - - dspmv(n, 1.0, A, x, y1) - dspmv(n, 0.5, A, x, y2) - dspmv(n, -0.5, A, x, y3) - dspmv(n, 0.0, A, x, y4) + val expected5 = new DenseVector(Array(43.5, -90.0, 44.0, -4.5)) + val expected6 = new DenseVector(Array(46.5, -96.0, 52.0, -1.5)) + val expected7 = new DenseVector(Array(45.0, -93.0, 48.0, -3.0)) + + dspmv(n, 1.0, A, x, 1.0, y1) + dspmv(n, 0.5, A, x, 1.0, y2) + dspmv(n, -0.5, A, x, 1.0, y3) + dspmv(n, 0.0, A, x, 1.0, y4) + dspmv(n, 1.0, A, x, 0.5, y5) + dspmv(n, 1.0, A, x, -0.5, y6) + dspmv(n, 1.0, A, x, 0.0, y7) assert(y1 ~== expected1 absTol 1e-8) assert(y2 ~== expected2 absTol 1e-8) assert(y3 ~== expected3 absTol 1e-8) assert(y4 ~== expected4 absTol 1e-8) + assert(y5 ~== expected5 absTol 1e-8) + assert(y6 ~== expected6 absTol 1e-8) + assert(y7 ~== expected7 absTol 1e-8) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala index a65a31b6ed6a..1207fc07dc62 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala @@ -26,14 +26,14 @@ import org.apache.spark.mllib.linalg.CholeskyDecomposition private[ml] class NormalEquationSolution( val fitIntercept: Boolean, private val _coefficients: Array[Double], - val aaInv: Option[DenseVector], + val aaInv: Option[Array[Double]], val objectiveHistory: Option[Array[Double]]) { - def coefficients: DenseVector = { + def coefficients: Array[Double] = { if (fitIntercept) { - new DenseVector(_coefficients.slice(0, _coefficients.length - 1)) + _coefficients.slice(0, _coefficients.length - 1) } else { - new DenseVector(_coefficients) + _coefficients } } @@ -69,7 +69,7 @@ private[ml] class CholeskySolver(val fitIntercept: Boolean) extends NormalEquati val x = CholeskyDecomposition.solve(aaBar.values, abBar.values) val aaInv = CholeskyDecomposition.inverse(aaBar.values, k) - new NormalEquationSolution(fitIntercept, x, Some(new DenseVector(aaInv)), None) + new NormalEquationSolution(fitIntercept, x, Some(aaInv), None) } } @@ -143,13 +143,13 @@ private[ml] class QuasiNewtonSolver( } coefValues(numFeatures) = bBar - dotProd } - val xxb = new DenseVector(new Array[Double](numFeaturesPlusIntercept)) - BLAS.dspmv(numFeaturesPlusIntercept, 1.0, aa, coef, xxb) - // loss = 1/2 (Y^T W Y - 2 beta^T X^T W Y + beta^T X^T W X beta) - val loss = 0.5 * bbBar - BLAS.dot(ab, coef) + 0.5 * BLAS.dot(coef, xxb) - // -gradient = X^T W X beta - X^T W Y - BLAS.axpy(-1.0, ab, xxb) - (loss, xxb.asBreeze.toDenseVector) + val aax = new DenseVector(new Array[Double](numFeaturesPlusIntercept)) + BLAS.dspmv(numFeaturesPlusIntercept, 1.0, aa, coef, 1.0, aax) + // loss = 1/2 (b^T W b - 2 x^T A^T W b + x^T A^T W A x) + val loss = 0.5 * bbBar - BLAS.dot(ab, coef) + 0.5 * BLAS.dot(coef, aax) + // -gradient = A^T W A x - A^T W b + BLAS.axpy(-1.0, ab, aax) + (loss, aax.asBreeze.toDenseVector) } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 4f489dfaa9f3..e839db3eaa14 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -46,8 +46,8 @@ private[ml] class WeightedLeastSquaresModel( * Given weighted observations (w,,i,,, a,,i,,, b,,i,,), we use the following weighted least squares * formulation: * - * min,,x,z,, 1/2 sum,,i,, w,,i,, (a,,i,,^T^ x + z - b,,i,,)^2^ / sum,,i,, w_i - * + lambda / delta (1/2 (1 - alpha) sum,,j,, (sigma,,j,, x,,j,,)^2^ + * min,,x,z,, 1/2 sum,,i,, w,,i,, (a,,i,,^T^ x + z - b,,i,,)^2^ / sum,,i,, w,,i,, + * + lambda / delta (1/2 (1 - alpha) sumj,, (sigma,,j,, x,,j,,)^2^ * + alpha sum,,j,, abs(sigma,,j,, x,,j,,)), * * where lambda is the regularization parameter, alpha is the ElasticNet mixing parameter, @@ -58,8 +58,11 @@ private[ml] class WeightedLeastSquaresModel( * match R's `lm`. * Turn on [[standardizeLabel]] to match R's `glmnet`. * + * @note The coefficients and intercept are always trained in the scaled space, but are returned + * on the original scale. [[standardizeFeatures]] and [[standardizeLabel]] can be used to + * control whether regularization is applied in the original space or the scaled space. * @param fitIntercept whether to fit intercept. If false, z is 0.0. - * @param regParam L2 regularization parameter (lambda). + * @param regParam Regularization parameter (lambda). * @param elasticNetParam the ElasticNet mixing parameter. * @param standardizeFeatures whether to standardize features. If true, sigma_,,j,, is the * population standard deviation of the j-th column of A. Otherwise, @@ -67,8 +70,8 @@ private[ml] class WeightedLeastSquaresModel( * @param standardizeLabel whether to standardize label. If true, delta is the population standard * deviation of the label column b. Otherwise, delta is 1.0. * @param solverType the type of solver to use for optimization. - * @param maxIter maximum number of iterations when stochastic optimization is used. - * @param tol the convergence tolerance of the iterations when stochastic optimization is used. + * @param maxIter maximum number of iterations. Only for QuasiNewton solverType. + * @param tol the convergence tolerance of the iterations. Only for QuasiNewton solverType. */ private[ml] class WeightedLeastSquares( val fitIntercept: Boolean, @@ -106,7 +109,6 @@ private[ml] class WeightedLeastSquares( val aStd = summary.aStd val abBar = summary.abBar val aaBar = summary.aaBar - val aaBarValues = aaBar.values val numFeatures = abBar.size val rawBStd = summary.bStd // if b is constant (rawBStd is zero), then b cannot be scaled. In this case @@ -135,42 +137,43 @@ private[ml] class WeightedLeastSquares( } } - val aBarStd = new Array[Double](numFeatures) + val aBarValues = aBar.values var j = 0 while (j < numFeatures) { if (aStd(j) == 0.0) { - aBarStd(j) = 0.0 + aBarValues(j) = 0.0 } else { - aBarStd(j) = aBar(j) / aStd(j) + aBarValues(j) /= aStd(j) } j += 1 } - val abBarStd = new Array[Double](numFeatures) + val abBarValues = abBar.values + val aStdValues = aStd.values j = 0 while (j < numFeatures) { - if (aStd(j) == 0.0) { - abBarStd(j) = 0.0 + if (aStdValues(j) == 0.0) { + abBarValues(j) = 0.0 } else { - abBarStd(j) = abBar(j) / (aStd(j) * bStd) + abBarValues(j) /= (aStdValues(j) * bStd) } j += 1 } - val aaBarStd = new Array[Double](triK) + val aaBarValues = aaBar.values j = 0 - var kk = 0 + var p = 0 while (j < numFeatures) { - val aStdJ = aStd(j) + val aStdJ = aStdValues(j) var i = 0 while (i <= j) { - val aStdI = aStd(i) + val aStdI = aStdValues(i) if (aStdJ == 0.0 || aStdI == 0.0) { - aaBarStd(kk) = 0.0 + aaBarValues(p) = 0.0 } else { - aaBarStd(kk) = aaBarValues(kk) / (aStdI * aStdJ) + aaBarValues(p) /= (aStdI * aStdJ) } - kk += 1 + p += 1 i += 1 } j += 1 @@ -183,7 +186,7 @@ private[ml] class WeightedLeastSquares( val effectiveL1RegParam = elasticNetParam * effectiveRegParam val effectiveL2RegParam = (1.0 - elasticNetParam) * effectiveRegParam - // add regularization to diagonals + // add L2 regularization to diagonals var i = 0 j = 2 while (i < triK) { @@ -199,15 +202,15 @@ private[ml] class WeightedLeastSquares( if (!standardizeLabel) { lambda *= bStd } - aaBarStd(i) += lambda + aaBarValues(i) += lambda i += j j += 1 } - val aa = getAtA(aaBarStd, aBarStd) - val ab = getAtB(abBarStd, bBarStd) + val aa = getAtA(aaBar.values, aBar.values) + val ab = getAtB(abBar.values, bBarStd) - val solver = if ((solverType == WeightedLeastSquares.Auto && elasticNetParam != 0.0) || - (solverType == WeightedLeastSquares.QuasiNewton)) { + val solver = if ((solverType == WeightedLeastSquares.Auto && elasticNetParam != 0.0 && + regParam != 0.0) || (solverType == WeightedLeastSquares.QuasiNewton)) { val effectiveL1RegFun: Option[(Int) => Double] = if (effectiveL1RegParam != 0.0) { Some((index: Int) => { if (fitIntercept && index == numFeatures) { @@ -231,7 +234,7 @@ private[ml] class WeightedLeastSquares( val solution = solver match { case cholesky: CholeskySolver => try { - cholesky.solve(bBarStd, bbBarStd, ab, aa, new DenseVector(aBarStd)) + cholesky.solve(bBarStd, bbBarStd, ab, aa, aBar) } catch { // if Auto solver is used and Cholesky fails due to singular AtA, then fall back to // quasi-newton solver @@ -239,36 +242,34 @@ private[ml] class WeightedLeastSquares( logWarning("Cholesky solver failed due to singular covariance matrix. " + "Retrying with Quasi-Newton solver.") // ab and aa were modified in place, so reconstruct them - val _aa = getAtA(aaBarStd, aBarStd) - val _ab = getAtB(abBarStd, bBarStd) + val _aa = getAtA(aaBar.values, aBar.values) + val _ab = getAtB(abBar.values, bBarStd) val newSolver = new QuasiNewtonSolver(fitIntercept, maxIter, tol, None) - newSolver.solve(bBarStd, bbBarStd, _ab, _aa, new DenseVector(aBarStd)) + newSolver.solve(bBarStd, bbBarStd, _ab, _aa, aBar) } case qn: QuasiNewtonSolver => - qn.solve(bBarStd, bbBarStd, ab, aa, new DenseVector(aBarStd)) + qn.solve(bBarStd, bbBarStd, ab, aa, aBar) } val intercept = solution.intercept * bStd - val coefficients = solution.coefficients + val coefficientArray = solution.coefficients // convert the coefficients from the scaled space to the original space - var ii = 0 - val coefficientArray = coefficients.toArray + var q = 0 val len = coefficientArray.length - while (ii < len) { - coefficientArray(ii) *= { if (aStd(ii) != 0.0) bStd / aStd(ii) else 0.0 } - ii += 1 + while (q < len) { + coefficientArray(q) *= { if (aStdValues(q) != 0.0) bStd / aStdValues(q) else 0.0 } + q += 1 } // aaInv is a packed upper triangular matrix, here we get all elements on diagonal val diagInvAtWA = solution.aaInv.map { inv => - val values = inv.values new DenseVector((1 to k).map { i => - val multiplier = if (i == k && fitIntercept) 1.0 else aStd(i - 1) * aStd(i - 1) - values(i + (i - 1) * i / 2 - 1) / (wSum * multiplier) + val multiplier = if (i == k && fitIntercept) 1.0 else aStdValues(i - 1) * aStdValues(i - 1) + inv(i + (i - 1) * i / 2 - 1) / (wSum * multiplier) }.toArray) }.getOrElse(new DenseVector(Array(0D))) - new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA, + new WeightedLeastSquaresModel(new DenseVector(coefficientArray), intercept, diagInvAtWA, solution.objectiveHistory.getOrElse(Array(0D))) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 3bf659bacdbc..88e74aaaee8f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -28,6 +28,9 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext private var instances: RDD[Instance] = _ private var instancesConstLabel: RDD[Instance] = _ + private var instancesConstZeroLabel: RDD[Instance] = _ + private var collinearInstances: RDD[Instance] = _ + private var constantFeaturesInstances: RDD[Instance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -58,6 +61,46 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(17.0, 4.0, Vectors.dense(3.0, 13.0)) ), 2) + + /* + A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) + b <- c(1, 2, 3, 4) + w <- c(1, 1, 1, 1) + */ + collinearInstances = sc.parallelize(Seq( + Instance(1.0, 1.0, Vectors.dense(1.0, 2.0)), + Instance(2.0, 1.0, Vectors.dense(2.0, 4.0)), + Instance(3.0, 1.0, Vectors.dense(3.0, 6.0)), + Instance(4.0, 1.0, Vectors.dense(4.0, 8.0)) + ), 2) + + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b.const <- c(0, 0, 0, 0) + w <- c(1, 2, 3, 4) + */ + instancesConstZeroLabel = sc.parallelize(Seq( + Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(0.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2) + + /* + R code: + + A <- matrix(c(1, 1, 1, 1, 5, 7, 11, 13), 4, 2) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + */ + constantFeaturesInstances = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(1.0, 5.0)), + Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(1.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(1.0, 13.0)) + ), 2) } test("WLS with strong L1 regularization") { @@ -65,8 +108,8 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext We initialize the coefficients for WLS QN solver to be weighted average of the label. Check here that with only an intercept the model converges to bBar. */ - val bAgg = instances.collect().foldLeft((0.0, 0.0)) { case ((sum, count), Instance(l, w, f)) => - (sum + w * l, count + w) + val bAgg = instances.collect().foldLeft((0.0, 0.0)) { + case ((sum, weightSum), Instance(l, w, f)) => (sum + w * l, weightSum + w) } val bBar = bAgg._1 / bAgg._2 val wls = new WeightedLeastSquares(true, 10, 1.0, true, true) @@ -76,6 +119,7 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext test("diagonal inverse of AtWA") { /* + library(Matrix) A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) w <- c(1, 2, 3, 4) W <- Diagonal(length(w), w) @@ -93,7 +137,8 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext */ val expectedWithIntercept = Vectors.dense(4.02, 0.50, 12.02) val expected = Vectors.dense(0.48336106, 0.02079867) - val wlsWithIntercept = new WeightedLeastSquares(true, 0.0, 0.0, true, true, + val wlsWithIntercept = new WeightedLeastSquares(fitIntercept = true, regParam = 0.0, + elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true, solverType = WeightedLeastSquares.Cholesky) val wlsModelWithIntercept = wlsWithIntercept.fit(instances) val wls = new WeightedLeastSquares(false, 0.0, 0.0, true, true, @@ -105,27 +150,17 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext } test("two collinear features") { - /* - A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) - b <- c(1, 2, 3, 4) - w <- c(1, 1, 1, 1) - */ - val singularInstances = sc.parallelize(Seq( - Instance(1.0, 1.0, Vectors.dense(1.0, 2.0)), - Instance(2.0, 1.0, Vectors.dense(2.0, 4.0)), - Instance(3.0, 1.0, Vectors.dense(3.0, 6.0)), - Instance(4.0, 1.0, Vectors.dense(4.0, 8.0)) - ), 2) - // Cholesky solver does not handle singular input intercept[SingularMatrixException] { - new WeightedLeastSquares(false, 0.0, 0.0, false, false, - solverType = WeightedLeastSquares.Cholesky).fit(singularInstances) + new WeightedLeastSquares(fitIntercept = false, regParam = 0.0, elasticNetParam = 0.0, + standardizeFeatures = false, standardizeLabel = false, + solverType = WeightedLeastSquares.Cholesky).fit(collinearInstances) } // Cholesky should not throw an exception since regularization is applied - new WeightedLeastSquares(false, 1.0, 0.0, false, false, - solverType = WeightedLeastSquares.Cholesky).fit(singularInstances) + new WeightedLeastSquares(fitIntercept = false, regParam = 1.0, elasticNetParam = 0.0, + standardizeFeatures = false, standardizeLabel = false, + solverType = WeightedLeastSquares.Cholesky).fit(collinearInstances) // quasi-newton solvers should handle singular input and make correct predictions // auto solver should try Cholesky first, then fall back to QN @@ -134,9 +169,9 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext solver <- Seq(WeightedLeastSquares.Auto, WeightedLeastSquares.QuasiNewton)) { val singularModel = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, standardizeFeatures = standardization, - standardizeLabel = standardization, solverType = solver).fit(singularInstances) + standardizeLabel = standardization, solverType = solver).fit(collinearInstances) - singularInstances.collect().foreach { case Instance(l, w, f) => + collinearInstances.collect().foreach { case Instance(l, w, f) => val pred = BLAS.dot(singularModel.coefficients, f) + singularModel.intercept assert(pred ~== l absTol 1e-6) } @@ -199,8 +234,8 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext for (standardization <- Seq(false, true)) { for (solver <- WeightedLeastSquares.supportedSolvers) { val wls = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, - standardizeFeatures = standardization, - standardizeLabel = standardization, solverType = solver).fit(instancesConstLabel) + standardizeFeatures = standardization, standardizeLabel = standardization, + solverType = solver).fit(instancesConstLabel) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) assert(actual ~== expected(idx) absTol 1e-4) } @@ -209,12 +244,10 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext } // when label is constant zero, and fitIntercept is false, we should not train and get all zeros - val instancesConstZeroLabel = instancesConstLabel.map { case Instance(l, w, f) => - Instance(0.0, w, f) - } for (solver <- WeightedLeastSquares.supportedSolvers) { - val wls = new WeightedLeastSquares(false, 0.0, 0.0, true, true, solverType = solver) - .fit(instancesConstZeroLabel) + val wls = new WeightedLeastSquares(fitIntercept = false, regParam = 0.0, + elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true, + solverType = solver).fit(instancesConstZeroLabel) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) assert(actual === Vectors.dense(0.0, 0.0, 0.0)) assert(wls.objectiveHistory === Array(0.0)) @@ -225,9 +258,9 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext // if regParam is non-zero and standardization is true, the problem is ill-defined and // an exception is thrown. for (solver <- WeightedLeastSquares.supportedSolvers) { - val wls = new WeightedLeastSquares( - fitIntercept = false, regParam = 0.1, elasticNetParam = 0.0, standardizeFeatures = true, - standardizeLabel = true, solverType = solver) + val wls = new WeightedLeastSquares(fitIntercept = false, regParam = 0.1, + elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true, + solverType = solver) intercept[IllegalArgumentException]{ wls.fit(instancesConstLabel) } @@ -235,35 +268,22 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext } test("WLS against glmnet with constant features") { - /* - R code: - - A <- matrix(c(1, 1, 1, 1, 5, 7, 11, 13), 4, 2) - b <- c(17, 19, 23, 29) - w <- c(1, 2, 3, 4) - */ - val constantFeatures = sc.parallelize(Seq( - Instance(17.0, 1.0, Vectors.dense(1.0, 5.0)), - Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(23.0, 3.0, Vectors.dense(1.0, 11.0)), - Instance(29.0, 4.0, Vectors.dense(1.0, 13.0)) - ), 2) - // Cholesky solver does not handle singular input with no regularization for (fitIntercept <- Seq(false, true); standardization <- Seq(false, true)) { - val wls = new WeightedLeastSquares(fitIntercept, 0.0, 0.0, standardization, standardization, + val wls = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, + standardizeFeatures = standardization, standardizeLabel = standardization, solverType = WeightedLeastSquares.Cholesky) - // for the case of no intercept, this would not have failed before but since we train - // in the standardized space now, it will fail intercept[SingularMatrixException] { - wls.fit(constantFeatures) + wls.fit(constantFeaturesInstances) } } + // TODO: add checks for cholesky output with regularization? // should not fail when regularization is added - new WeightedLeastSquares(true, 0.5, 0.0, standardizeFeatures = true, - standardizeLabel = true, solverType = WeightedLeastSquares.Cholesky).fit(constantFeatures) + new WeightedLeastSquares(fitIntercept = true, regParam = 0.5, elasticNetParam = 0.0, + standardizeFeatures = true, standardizeLabel = true, + solverType = WeightedLeastSquares.Cholesky).fit(constantFeaturesInstances) /* for (intercept in c(FALSE, TRUE)) { @@ -316,15 +336,19 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext val wls = new WeightedLeastSquares(fitIntercept, regParam = lambda, elasticNetParam = alpha, standardizeFeatures = standardization, standardizeLabel = true, solverType = WeightedLeastSquares.QuasiNewton) - val model = wls.fit(constantFeatures) + val model = wls.fit(constantFeaturesInstances) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~== expectedQuasiNewton(idx) absTol 1e-6) idx += 1 } } - test("WLS against glmnet with L1 regularization") { + test("WLS against glmnet with L1/ElasticNet regularization") { /* + R code: + + library(glmnet) + for (intercept in c(FALSE, TRUE)) { for (lambda in c(0.1, 0.5, 1.0)) { for (standardize in c(FALSE, TRUE)) { @@ -418,9 +442,8 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext regParam <- Seq(0.1, 0.5, 1.0); standardizeFeatures <- Seq(false, true); elasticNetParam <- Seq(0.1, 0.5, 1.0)) { - val wls = new WeightedLeastSquares( - fitIntercept, regParam, elasticNetParam = elasticNetParam, standardizeFeatures, - standardizeLabel = true) + val wls = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = elasticNetParam, + standardizeFeatures, standardizeLabel = true, solverType = WeightedLeastSquares.Auto) .fit(instances) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) assert(actual ~== expected(idx) absTol 1e-4) @@ -428,7 +451,7 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext } } - test("WLS against glmnet") { + test("WLS against glmnet with L2 regularization") { /* R code: diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 1098adcb5c2f..c0e8afbf5e34 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -761,9 +761,9 @@ class LinearRegressionSuite assert(model.summary.meanAbsoluteError ~== 0.07961668 relTol 1E-4) assert(model.summary.r2 ~== 0.9998737 relTol 1E-4) - // Normal solver uses "WeightedLeastSquares". When no regularization is applied, - // this algorithm uses a direct solver and does not generate an objective history because - // it does not run through iterations. + // Normal solver uses "WeightedLeastSquares". If no regularization is applied or only L2 + // regularization is applied, this algorithm uses a direct solver and does not generate an + // objective history because it does not run through iterations. if (solver == "l-bfgs") { // Objective function should be monotonically decreasing for linear regression assert( From 9742e7d0a883d75acfa7794cd3d2d913bab96bcf Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 12 Oct 2016 15:35:50 -0700 Subject: [PATCH 3/6] add more constant features checks --- .../ml/optim/WeightedLeastSquaresSuite.scala | 54 ++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 88e74aaaee8f..5f638b488005 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -279,11 +279,38 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext } } - // TODO: add checks for cholesky output with regularization? - // should not fail when regularization is added - new WeightedLeastSquares(fitIntercept = true, regParam = 0.5, elasticNetParam = 0.0, - standardizeFeatures = true, standardizeLabel = true, - solverType = WeightedLeastSquares.Cholesky).fit(constantFeaturesInstances) + // Cholesky also fails when regularization is added but we don't wish to standardize + val wls = new WeightedLeastSquares(true, regParam = 0.5, elasticNetParam = 0.0, + standardizeFeatures = false, standardizeLabel = false, + solverType = WeightedLeastSquares.Cholesky) + intercept[SingularMatrixException] { + wls.fit(constantFeaturesInstances) + } + + /* + for (intercept in c(FALSE, TRUE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0.5, + standardize=T, alpha=0.0, thresh=1E-14) + print(as.vector(coef(model))) + } + [1] 0.000000 0.000000 2.235802 + [1] 9.798771 0.000000 1.365503 + */ + // should not fail when regularization and standardization are added + val expectedCholesky = Seq( + Vectors.dense(0.0, 0.0, 2.235802), + Vectors.dense(9.798771, 0.0, 1.365503) + ) + var idx = 0 + for (fitIntercept <- Seq(false, true)) { + val wls = new WeightedLeastSquares(fitIntercept = fitIntercept, regParam = 0.5, + elasticNetParam = 0.0, standardizeFeatures = true, + standardizeLabel = true, solverType = WeightedLeastSquares.Cholesky) + .fit(constantFeaturesInstances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expectedCholesky(idx) absTol 1e-6) + idx += 1 + } /* for (intercept in c(FALSE, TRUE)) { @@ -329,16 +356,19 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext Vectors.dense(9.798771, 0.000000, 1.365503), Vectors.dense(9.919095, 0.000000, 1.353933), Vectors.dense(10.052804, 0.000000, 1.341077)) - var idx = 0 + + idx = 0 for (fitIntercept <- Seq(false, true); standardization <- Seq(false, true); (lambda, alpha) <- Seq((0.0, 0.0), (0.5, 0.0), (0.5, 0.5), (0.5, 1.0))) { - val wls = new WeightedLeastSquares(fitIntercept, regParam = lambda, elasticNetParam = alpha, - standardizeFeatures = standardization, standardizeLabel = true, - solverType = WeightedLeastSquares.QuasiNewton) - val model = wls.fit(constantFeaturesInstances) - val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~== expectedQuasiNewton(idx) absTol 1e-6) + for (solver <- Seq(WeightedLeastSquares.Auto, WeightedLeastSquares.Cholesky)) { + val wls = new WeightedLeastSquares(fitIntercept, regParam = lambda, elasticNetParam = alpha, + standardizeFeatures = standardization, standardizeLabel = true, + solverType = WeightedLeastSquares.QuasiNewton) + val model = wls.fit(constantFeaturesInstances) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expectedQuasiNewton(idx) absTol 1e-6) + } idx += 1 } } From c105c05e877f7c765707f322fdcd060aa28fe102 Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 20 Oct 2016 16:37:25 -0700 Subject: [PATCH 4/6] address review, remove fitIntercept from NE solution --- .../org/apache/spark/ml/linalg/BLAS.scala | 1 + .../spark/ml/optim/NormalEquationSolver.scala | 27 +++++++++---------- .../spark/ml/optim/WeightedLeastSquares.scala | 15 ++++++++--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 4b5a0e09db91..ef3890962494 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -246,6 +246,7 @@ private[spark] object BLAS extends Serializable { /** * y := alpha*A*x + beta*y * + * @param n The order of the n by n matrix A. * @param A The upper triangular part of A in a [[DenseVector]] (column major). * @param x The [[DenseVector]] transformed by A. * @param y The [[DenseVector]] to be modified in place. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala index 1207fc07dc62..8b078b93e6ff 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala @@ -24,20 +24,19 @@ import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vectors} import org.apache.spark.mllib.linalg.CholeskyDecomposition private[ml] class NormalEquationSolution( - val fitIntercept: Boolean, - private val _coefficients: Array[Double], + val coefficients: Array[Double], val aaInv: Option[Array[Double]], val objectiveHistory: Option[Array[Double]]) { - def coefficients: Array[Double] = { - if (fitIntercept) { - _coefficients.slice(0, _coefficients.length - 1) - } else { - _coefficients - } - } - - def intercept: Double = if (fitIntercept) _coefficients.last else 0.0 +// def coefficients: Array[Double] = { +// if (fitIntercept) { +// _coefficients.slice(0, _coefficients.length - 1) +// } else { +// _coefficients +// } +// } +// +// def intercept: Double = if (fitIntercept) _coefficients.last else 0.0 } /** @@ -57,7 +56,7 @@ private[ml] sealed trait NormalEquationSolver { /** * A class that solves the normal equations directly, using Cholesky decomposition. */ -private[ml] class CholeskySolver(val fitIntercept: Boolean) extends NormalEquationSolver { +private[ml] class CholeskySolver extends NormalEquationSolver { def solve( bBar: Double, @@ -69,7 +68,7 @@ private[ml] class CholeskySolver(val fitIntercept: Boolean) extends NormalEquati val x = CholeskyDecomposition.solve(aaBar.values, abBar.values) val aaInv = CholeskyDecomposition.inverse(aaBar.values, k) - new NormalEquationSolution(fitIntercept, x, Some(aaInv), None) + new NormalEquationSolution(x, Some(aaInv), None) } } @@ -111,7 +110,7 @@ private[ml] class QuasiNewtonSolver( arrayBuilder += state.adjustedValue } val x = state.x.toArray.clone() - new NormalEquationSolution(fitIntercept, x, None, Some(arrayBuilder.result())) + new NormalEquationSolution(x, None, Some(arrayBuilder.result())) } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index e839db3eaa14..52720e251055 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -137,6 +137,7 @@ private[ml] class WeightedLeastSquares( } } + // scale aBar to standardized space in-place val aBarValues = aBar.values var j = 0 while (j < numFeatures) { @@ -148,6 +149,7 @@ private[ml] class WeightedLeastSquares( j += 1 } + // scale abBar to standardized space in-place val abBarValues = abBar.values val aStdValues = aStd.values j = 0 @@ -160,6 +162,7 @@ private[ml] class WeightedLeastSquares( j += 1 } + // scale aaBar to standardized space in-place val aaBarValues = aaBar.values j = 0 var p = 0 @@ -219,7 +222,7 @@ private[ml] class WeightedLeastSquares( if (standardizeFeatures) { effectiveL1RegParam } else { - if (aStd(index) != 0.0) effectiveL1RegParam / aStd(index) else 0.0 + if (aStdValues(index) != 0.0) effectiveL1RegParam / aStdValues(index) else 0.0 } } }) @@ -228,7 +231,7 @@ private[ml] class WeightedLeastSquares( } new QuasiNewtonSolver(fitIntercept, maxIter, tol, effectiveL1RegFun) } else { - new CholeskySolver(fitIntercept) + new CholeskySolver } val solution = solver match { @@ -250,8 +253,12 @@ private[ml] class WeightedLeastSquares( case qn: QuasiNewtonSolver => qn.solve(bBarStd, bbBarStd, ab, aa, aBar) } - val intercept = solution.intercept * bStd - val coefficientArray = solution.coefficients + val (coefficientArray, intercept) = if (fitIntercept) { + (solution.coefficients.slice(0, solution.coefficients.length - 1), + solution.coefficients.last * bStd) + } else { + (solution.coefficients, 0.0) + } // convert the coefficients from the scaled space to the original space var q = 0 From cf3407b01a3892abb9be1c943915227f75877996 Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 20 Oct 2016 16:47:46 -0700 Subject: [PATCH 5/6] cleanup --- .../spark/ml/optim/NormalEquationSolver.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala index 8b078b93e6ff..74d268434320 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala @@ -26,18 +26,7 @@ import org.apache.spark.mllib.linalg.CholeskyDecomposition private[ml] class NormalEquationSolution( val coefficients: Array[Double], val aaInv: Option[Array[Double]], - val objectiveHistory: Option[Array[Double]]) { - -// def coefficients: Array[Double] = { -// if (fitIntercept) { -// _coefficients.slice(0, _coefficients.length - 1) -// } else { -// _coefficients -// } -// } -// -// def intercept: Double = if (fitIntercept) _coefficients.last else 0.0 -} + val objectiveHistory: Option[Array[Double]]) /** * Interface for classes that solve the normal equations locally. From 28f8c2f51638f69a1c1bc43f3ccc80c6dca296ac Mon Sep 17 00:00:00 2001 From: sethah Date: Fri, 21 Oct 2016 17:22:24 -0700 Subject: [PATCH 6/6] minor review feedback --- .../spark/ml/optim/NormalEquationSolver.scala | 14 ++++++++++++-- .../spark/ml/optim/WeightedLeastSquares.scala | 6 +++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala index 74d268434320..2f5299b01022 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/NormalEquationSolver.scala @@ -23,6 +23,16 @@ import scala.collection.mutable import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vectors} import org.apache.spark.mllib.linalg.CholeskyDecomposition +/** + * A class to hold the solution to the normal equations A^T^ W A x = A^T^ W b. + * + * @param coefficients The least squares coefficients. The last element in the coefficients + * is the intercept when bias is added to A. + * @param aaInv An option containing the upper triangular part of (A^T^ W A)^-1^, in column major + * format. None when an optimization program is used to solve the normal equations. + * @param objectiveHistory Option containing the objective history when an optimization program is + * used to solve the normal equations. None when an analytic solver is used. + */ private[ml] class NormalEquationSolution( val coefficients: Array[Double], val aaInv: Option[Array[Double]], @@ -65,7 +75,7 @@ private[ml] class CholeskySolver extends NormalEquationSolver { * A class for solving the normal equations using Quasi-Newton optimization methods. */ private[ml] class QuasiNewtonSolver( - val fitIntercept: Boolean, + fitIntercept: Boolean, maxIter: Int, tol: Double, l1RegFunc: Option[(Int) => Double]) extends NormalEquationSolver { @@ -135,7 +145,7 @@ private[ml] class QuasiNewtonSolver( BLAS.dspmv(numFeaturesPlusIntercept, 1.0, aa, coef, 1.0, aax) // loss = 1/2 (b^T W b - 2 x^T A^T W b + x^T A^T W A x) val loss = 0.5 * bbBar - BLAS.dot(ab, coef) + 0.5 * BLAS.dot(coef, aax) - // -gradient = A^T W A x - A^T W b + // gradient = A^T W A x - A^T W b BLAS.axpy(-1.0, ab, aax) (loss, aax.asBreeze.toDenseVector) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 52720e251055..2223f126f1b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -63,8 +63,8 @@ private[ml] class WeightedLeastSquaresModel( * control whether regularization is applied in the original space or the scaled space. * @param fitIntercept whether to fit intercept. If false, z is 0.0. * @param regParam Regularization parameter (lambda). - * @param elasticNetParam the ElasticNet mixing parameter. - * @param standardizeFeatures whether to standardize features. If true, sigma_,,j,, is the + * @param elasticNetParam the ElasticNet mixing parameter (alpha). + * @param standardizeFeatures whether to standardize features. If true, sigma,,j,, is the * population standard deviation of the j-th column of A. Otherwise, * sigma,,j,, is 1.0. * @param standardizeLabel whether to standardize label. If true, delta is the population standard @@ -289,7 +289,7 @@ private[ml] class WeightedLeastSquares( } } - /** Construct A^T^ B from summary statistics. */ + /** Construct A^T^ b from summary statistics. */ private def getAtB(abBar: Array[Double], bBar: Double): DenseVector = { if (fitIntercept) { new DenseVector(Array.concat(abBar, Array(bBar)))