diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 55b7510656643..26f3d2f88d789 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -80,23 +80,16 @@ private[ml] class WeightedLeastSquares( val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) summary.validate() logInfo(s"Number of instances: ${summary.count}.") - val k = if (fitIntercept) summary.k + 1 else summary.k - val triK = summary.triK val wSum = summary.wSum val bBar = summary.bBar val bStd = summary.bStd - val aBar = summary.aBar - val aVar = summary.aVar - val abBar = summary.abBar - val aaBar = summary.aaBar - val aaValues = aaBar.values if (bStd == 0) { if (fitIntercept) { logWarning(s"The standard deviation of the label is zero, so the coefficients will be " + s"zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") - val coefficients = new DenseVector(Array.ofDim(k-1)) + val coefficients = new DenseVector(Array.ofDim(summary.k)) val intercept = bBar val diagInvAtWA = new DenseVector(Array(0D)) return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) @@ -108,6 +101,53 @@ private[ml] class WeightedLeastSquares( "Consider setting fitIntercept=true.") } } + /* + * If more than one of the features in the data are constant (i.e. data matrix has constant + * columns), then A^T.A is no longer positive definite and Cholesky decomposition fails + * (because the normal equation does not have a solution). + * In order to find a solution, we need to drop constant columns from the data matrix. Or, + * we can drop corresponding column and row from A^T.A matrix. + * Once we drop rows/columns from A^T.A matrix, the Cholesky decomposition will produce + * correct coefficients. But, for the final result, we need to add zeros to the list of + * coefficients corresponding to the constant features. + */ + val aVarRaw = summary.aVar.toArray + // this will keep track of features to keep in the model, and remove + // features with zero variance. + val nzVarIndex = aVarRaw.zipWithIndex.filter(_._1 != 0.0).map(_._2) + val nz = nzVarIndex.length + // if there are features with zero variance, then ATA is not positive definite, and we need to + // keep track of that. + val isSingular = summary.k > nz + val k = if (fitIntercept) nz + 1 else nz + val triK = nz * (nz + 1) / 2 + + val aVar = for (i <- nzVarIndex) yield aVarRaw(i) + val aBar = if (isSingular) { + val aBarTemp = summary.aBar.toArray + for (i <- nzVarIndex) yield aBarTemp(i) + } else { + summary.aBar.toArray + } + val abBar = if (isSingular) { + val abBarTemp = summary.abBar.toArray + for (i <- nzVarIndex) yield {abBarTemp(i)} + } else { + summary.abBar.toArray + } + // NOTE: aaBar represents upper triangular part of A^T.A matrix in column major order. + // We need to drop columns and rows from A^T.A corresponding to the features which have + // zero variance. The following logic removes elements from aaBar corresponding to zerp + // variance which effectively removes columns and rows from A^T.A. + val aaBar = if (isSingular) { + val aaBarTemp = summary.aaBar.toArray + (for { col <- 0 until summary.k + row <- 0 to col + if aVarRaw(col) != 0.0 && aVarRaw(row) != 0.0 } yield + aaBarTemp(row + col * (col + 1) / 2)).toArray + } else { + summary.aaBar.toArray + } // add regularization to diagonals var i = 0 @@ -117,37 +157,50 @@ private[ml] class WeightedLeastSquares( if (standardizeFeatures) { lambda *= aVar(j - 2) } - if (standardizeLabel && bStd != 0) { + if (standardizeLabel && bStd != 0.0) { lambda /= bStd } - aaValues(i) += lambda + aaBar(i) += lambda i += j j += 1 } val aa = if (fitIntercept) { - Array.concat(aaBar.values, aBar.values, Array(1.0)) + Array.concat(aaBar, aBar, Array(1.0)) } else { - aaBar.values + aaBar } val ab = if (fitIntercept) { - Array.concat(abBar.values, Array(bBar)) + Array.concat(abBar, Array(bBar)) } else { - abBar.values + abBar } val x = CholeskyDecomposition.solve(aa, ab) - + val (coefs, intercept) = if (fitIntercept) { + (x.init, x.last) + } else { + (x, 0.0) + } val aaInv = CholeskyDecomposition.inverse(aa, k) - // aaInv is a packed upper triangular matrix, here we get all elements on diagonal - val diagInvAtWA = new DenseVector((1 to k).map { i => - aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray) + val aaInvDiag = (1 to k).map { i => + aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray - val (coefficients, intercept) = if (fitIntercept) { - (new DenseVector(x.slice(0, x.length - 1)), x.last) + val (coefficients, diagInvAtWA) = if (isSingular) { + // if there are constant features in the data, we need to add zeros for the coefficients + // for these features. + val coefTemp = Array.ofDim[Double](summary.k) + val diagTemp = Array.ofDim[Double](summary.k) + var i = 0 + while (i < nz) { + coefTemp(nzVarIndex(i)) = coefs(i) + diagTemp(nzVarIndex(i)) = aaInvDiag(i) + i += 1 + } + (new DenseVector(coefTemp), new DenseVector(diagTemp)) } else { - (new DenseVector(x), 0.0) + (new DenseVector(coefs), new DenseVector(aaInvDiag)) } new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 9dee04c8776db..2e4a914144dcb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -39,6 +39,7 @@ class LinearRegressionSuite @transient var datasetWithWeight: DataFrame = _ @transient var datasetWithWeightConstantLabel: DataFrame = _ @transient var datasetWithWeightZeroLabel: DataFrame = _ + @transient var datasetWithWeightConstantFeatures: DataFrame = _ /* In `LinearRegressionSuite`, we will make sure that the model trained by SparkML @@ -117,6 +118,21 @@ class LinearRegressionSuite Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(0.0, 4.0, Vectors.dense(3.0, 13.0)) ), 2)) + + /* + R code: + A <- matrix(c(0, 1, 2, 3, 9, 9, 9, 9, 5, 7, 11, 13, 6, 6, 6, 6), 4, 4) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + df.const.feature <- as.data.frame(cbind(A, b)) + */ + datasetWithWeightConstantFeatures = sqlContext.createDataFrame( + sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(0.0, 9.0, 5.0, 6.0)), + Instance(19.0, 2.0, Vectors.dense(1.0, 9.0, 7.0, 6.0)), + Instance(23.0, 3.0, Vectors.dense(2.0, 9.0, 11.0, 6.0)), + Instance(29.0, 4.0, Vectors.dense(3.0, 9.0, 13.0, 6.0)) + ), 2)) } test("params") { @@ -622,6 +638,36 @@ class LinearRegressionSuite } } + test("linear regression model with constant features") { + /* + R code: + for (intercept in c(TRUE, FALSE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0, alpha=0, + thresh=1E-20) + print(as.vector(coef(model))) + } + [1] 18.08 6.08 0.00 -0.60 0.00 + [1] 0.000000 -3.727121 0.000000 3.009983 0.000000 + */ + val expected = Seq( + Vectors.dense(18.08, 6.08, 0.0, -0.60, 0.0), + Vectors.dense(0.0, -3.727121, 0.0, 3.009983, 0.0)) + Seq("auto", "l-bfgs", "normal").foreach { solver => + var idx = 0 + for (fitIntercept <- Seq(true, false)) { + val model = new LinearRegression() + .setFitIntercept(fitIntercept) + .setWeightCol("weight") + .setSolver(solver) + .fit(datasetWithWeightConstantFeatures) + val actual = Vectors.dense(model.intercept, model.coefficients(0), + model.coefficients(1), model.coefficients(2), model.coefficients(3)) + assert(actual ~== expected(idx) absTol 1e-4) + idx += 1 + } + } + } + test("regularized linear regression through origin with constant label") { // The problem is ill-defined if fitIntercept=false, regParam is non-zero. // An exception is thrown in this case.