From 9412ef439d523e58e5d6b8294628c2196f6f9019 Mon Sep 17 00:00:00 2001 From: Imran Younus Date: Wed, 9 Mar 2016 11:38:12 -0800 Subject: [PATCH 1/3] remove constant features from training in noraml solver --- .../spark/ml/optim/WeightedLeastSquares.scala | 93 +++++++++++++++---- .../ml/regression/LinearRegressionSuite.scala | 46 +++++++++ 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 55b7510656643..72e62d80d5ced 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -80,23 +80,16 @@ private[ml] class WeightedLeastSquares( val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) summary.validate() logInfo(s"Number of instances: ${summary.count}.") - val k = if (fitIntercept) summary.k + 1 else summary.k - val triK = summary.triK val wSum = summary.wSum val bBar = summary.bBar val bStd = summary.bStd - val aBar = summary.aBar - val aVar = summary.aVar - val abBar = summary.abBar - val aaBar = summary.aaBar - val aaValues = aaBar.values if (bStd == 0) { if (fitIntercept) { logWarning(s"The standard deviation of the label is zero, so the coefficients will be " + s"zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") - val coefficients = new DenseVector(Array.ofDim(k-1)) + val coefficients = new DenseVector(Array.ofDim(summary.k)) val intercept = bBar val diagInvAtWA = new DenseVector(Array(0D)) return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) @@ -108,6 +101,53 @@ private[ml] class WeightedLeastSquares( "Consider setting fitIntercept=true.") } } + /* + If more than of the features in the data are constant (i.e, data matrix has constant columns), + then A^T.A is no longer positive definite and Cholesky decomposition fails (because the + normal equation does not have a solution). + In order to find a solution, we need to drop constant columns from the data matrix. Or, + we can drop corresponding column and row from A^T.A matrix. + Once we drop rows/columns from A^T.A matrix, the Cholesky decomposition will produce + correct coefficients. But, for the final result, we need to add zeros to the list of + coefficients corresponding to the constant features. + */ + val aVarRaw = summary.aVar.values + // this will keep track of features to keep in the model, and remove + // features with zero variance. + val nzVarIndex = aVarRaw.zipWithIndex.filter( _._1 != 0 ).map( _._2 ) + val nz = nzVarIndex.length + // if there are features with zero variance, then ATA is not positive definite, and we need to + // keep track of that. + val singular = summary.k > nz + val k = if (fitIntercept) nz + 1 else nz + val triK = nz * (nz + 1) / 2 + + val aVar = if (singular) { + for (i <- nzVarIndex) yield {aVarRaw(i)} + } else { + aVarRaw + } + val aBar = if (singular) { + val aBarTemp = summary.aBar.values + for (i <- nzVarIndex) yield {aBarTemp(i)} + } else { + summary.aBar.values + } + val abBar = if (singular) { + val abBarTemp = summary.abBar.values + for (i <- nzVarIndex) yield {abBarTemp(i)} + } else { + summary.abBar.values + } + val aaBar = if (singular) { + val aaBarTemp = summary.aaBar.values + (for { col <- 0 until summary.k + row <- 0 to col + if aVarRaw(col) != 0 & aVarRaw(row) != 0 } yield + aaBarTemp(row + col * (col + 1) / 2)).toArray + } else { + summary.aaBar.values + } // add regularization to diagonals var i = 0 @@ -120,34 +160,47 @@ private[ml] class WeightedLeastSquares( if (standardizeLabel && bStd != 0) { lambda /= bStd } - aaValues(i) += lambda + aaBar(i) += lambda i += j j += 1 } val aa = if (fitIntercept) { - Array.concat(aaBar.values, aBar.values, Array(1.0)) + Array.concat(aaBar, aBar, Array(1.0)) } else { - aaBar.values + aaBar } val ab = if (fitIntercept) { - Array.concat(abBar.values, Array(bBar)) + Array.concat(abBar, Array(bBar)) } else { - abBar.values + abBar } val x = CholeskyDecomposition.solve(aa, ab) - + val (coefs, intercept) = if (fitIntercept) { + (x.slice(0, x.length - 1), x.last) + } else { + (x, 0.0) + } val aaInv = CholeskyDecomposition.inverse(aa, k) - // aaInv is a packed upper triangular matrix, here we get all elements on diagonal - val diagInvAtWA = new DenseVector((1 to k).map { i => - aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray) + val aaInvDiag = (1 to k).map { i => + aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray - val (coefficients, intercept) = if (fitIntercept) { - (new DenseVector(x.slice(0, x.length - 1)), x.last) + val (coefficients, diagInvAtWA) = if (singular) { + // if there are constant features in the data, we need to add zeros for the coefficients + // for these features. + val coefTemp = Array.ofDim[Double](summary.k) + val diagTemp = Array.ofDim[Double](summary.k) + var i = 0 + while (i < nz) { + coefTemp(nzVarIndex(i)) = coefs(i) + diagTemp(nzVarIndex(i)) = aaInvDiag(i) + i += 1 + } + (new DenseVector(coefTemp), new DenseVector(diagTemp)) } else { - (new DenseVector(x), 0.0) + (new DenseVector(coefs), new DenseVector(aaInvDiag)) } new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 9dee04c8776db..2e4a914144dcb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -39,6 +39,7 @@ class LinearRegressionSuite @transient var datasetWithWeight: DataFrame = _ @transient var datasetWithWeightConstantLabel: DataFrame = _ @transient var datasetWithWeightZeroLabel: DataFrame = _ + @transient var datasetWithWeightConstantFeatures: DataFrame = _ /* In `LinearRegressionSuite`, we will make sure that the model trained by SparkML @@ -117,6 +118,21 @@ class LinearRegressionSuite Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(0.0, 4.0, Vectors.dense(3.0, 13.0)) ), 2)) + + /* + R code: + A <- matrix(c(0, 1, 2, 3, 9, 9, 9, 9, 5, 7, 11, 13, 6, 6, 6, 6), 4, 4) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + df.const.feature <- as.data.frame(cbind(A, b)) + */ + datasetWithWeightConstantFeatures = sqlContext.createDataFrame( + sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(0.0, 9.0, 5.0, 6.0)), + Instance(19.0, 2.0, Vectors.dense(1.0, 9.0, 7.0, 6.0)), + Instance(23.0, 3.0, Vectors.dense(2.0, 9.0, 11.0, 6.0)), + Instance(29.0, 4.0, Vectors.dense(3.0, 9.0, 13.0, 6.0)) + ), 2)) } test("params") { @@ -622,6 +638,36 @@ class LinearRegressionSuite } } + test("linear regression model with constant features") { + /* + R code: + for (intercept in c(TRUE, FALSE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0, alpha=0, + thresh=1E-20) + print(as.vector(coef(model))) + } + [1] 18.08 6.08 0.00 -0.60 0.00 + [1] 0.000000 -3.727121 0.000000 3.009983 0.000000 + */ + val expected = Seq( + Vectors.dense(18.08, 6.08, 0.0, -0.60, 0.0), + Vectors.dense(0.0, -3.727121, 0.0, 3.009983, 0.0)) + Seq("auto", "l-bfgs", "normal").foreach { solver => + var idx = 0 + for (fitIntercept <- Seq(true, false)) { + val model = new LinearRegression() + .setFitIntercept(fitIntercept) + .setWeightCol("weight") + .setSolver(solver) + .fit(datasetWithWeightConstantFeatures) + val actual = Vectors.dense(model.intercept, model.coefficients(0), + model.coefficients(1), model.coefficients(2), model.coefficients(3)) + assert(actual ~== expected(idx) absTol 1e-4) + idx += 1 + } + } + } + test("regularized linear regression through origin with constant label") { // The problem is ill-defined if fitIntercept=false, regParam is non-zero. // An exception is thrown in this case. From 652d2bd2c04e6d872451677c54751c2267fa2699 Mon Sep 17 00:00:00 2001 From: Imran Younus Date: Fri, 11 Mar 2016 17:56:10 -0800 Subject: [PATCH 2/3] incorporating suggestionsmade by sethah --- .../spark/ml/optim/WeightedLeastSquares.scala | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 72e62d80d5ced..7df39cff83ac9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -102,19 +102,19 @@ private[ml] class WeightedLeastSquares( } } /* - If more than of the features in the data are constant (i.e, data matrix has constant columns), - then A^T.A is no longer positive definite and Cholesky decomposition fails (because the - normal equation does not have a solution). - In order to find a solution, we need to drop constant columns from the data matrix. Or, - we can drop corresponding column and row from A^T.A matrix. - Once we drop rows/columns from A^T.A matrix, the Cholesky decomposition will produce - correct coefficients. But, for the final result, we need to add zeros to the list of - coefficients corresponding to the constant features. - */ + * If more than one of the features in the data are constant (i.e. data matrix has constant + * columns), then A^T.A is no longer positive definite and Cholesky decomposition fails + * (because the normal equation does not have a solution). + * In order to find a solution, we need to drop constant columns from the data matrix. Or, + * we can drop corresponding column and row from A^T.A matrix. + * Once we drop rows/columns from A^T.A matrix, the Cholesky decomposition will produce + * correct coefficients. But, for the final result, we need to add zeros to the list of + * coefficients corresponding to the constant features. + */ val aVarRaw = summary.aVar.values // this will keep track of features to keep in the model, and remove // features with zero variance. - val nzVarIndex = aVarRaw.zipWithIndex.filter( _._1 != 0 ).map( _._2 ) + val nzVarIndex = aVarRaw.zipWithIndex.filter(_._1 != 0).map(_._2) val nz = nzVarIndex.length // if there are features with zero variance, then ATA is not positive definite, and we need to // keep track of that. @@ -123,13 +123,13 @@ private[ml] class WeightedLeastSquares( val triK = nz * (nz + 1) / 2 val aVar = if (singular) { - for (i <- nzVarIndex) yield {aVarRaw(i)} + for (i <- nzVarIndex) yield aVarRaw(i) } else { aVarRaw } val aBar = if (singular) { val aBarTemp = summary.aBar.values - for (i <- nzVarIndex) yield {aBarTemp(i)} + for (i <- nzVarIndex) yield aBarTemp(i) } else { summary.aBar.values } @@ -139,11 +139,15 @@ private[ml] class WeightedLeastSquares( } else { summary.abBar.values } + // NOTE: aaBar represents upper triangular part of A^T.A matrix in column major order. + // We need to drop columns and rows from A^T.A corresponding to the features which have + // zero variance. The following logic removes elements from aaBar corresponding to zerp + // variance which effectively removes columns and rows from A^T.A. val aaBar = if (singular) { val aaBarTemp = summary.aaBar.values (for { col <- 0 until summary.k row <- 0 to col - if aVarRaw(col) != 0 & aVarRaw(row) != 0 } yield + if aVarRaw(col) != 0 && aVarRaw(row) != 0 } yield aaBarTemp(row + col * (col + 1) / 2)).toArray } else { summary.aaBar.values @@ -178,7 +182,7 @@ private[ml] class WeightedLeastSquares( val x = CholeskyDecomposition.solve(aa, ab) val (coefs, intercept) = if (fitIntercept) { - (x.slice(0, x.length - 1), x.last) + (x.init, x.last) } else { (x, 0.0) } From e9c80a8e19ad44d034829c35fdf68f9b42682700 Mon Sep 17 00:00:00 2001 From: Imran Younus Date: Mon, 14 Mar 2016 12:10:58 -0700 Subject: [PATCH 3/3] removing use of .values when getting arrays from DenseVector --- .../spark/ml/optim/WeightedLeastSquares.scala | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 7df39cff83ac9..26f3d2f88d789 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -111,46 +111,42 @@ private[ml] class WeightedLeastSquares( * correct coefficients. But, for the final result, we need to add zeros to the list of * coefficients corresponding to the constant features. */ - val aVarRaw = summary.aVar.values + val aVarRaw = summary.aVar.toArray // this will keep track of features to keep in the model, and remove // features with zero variance. - val nzVarIndex = aVarRaw.zipWithIndex.filter(_._1 != 0).map(_._2) + val nzVarIndex = aVarRaw.zipWithIndex.filter(_._1 != 0.0).map(_._2) val nz = nzVarIndex.length // if there are features with zero variance, then ATA is not positive definite, and we need to // keep track of that. - val singular = summary.k > nz + val isSingular = summary.k > nz val k = if (fitIntercept) nz + 1 else nz val triK = nz * (nz + 1) / 2 - val aVar = if (singular) { - for (i <- nzVarIndex) yield aVarRaw(i) - } else { - aVarRaw - } - val aBar = if (singular) { - val aBarTemp = summary.aBar.values + val aVar = for (i <- nzVarIndex) yield aVarRaw(i) + val aBar = if (isSingular) { + val aBarTemp = summary.aBar.toArray for (i <- nzVarIndex) yield aBarTemp(i) } else { - summary.aBar.values + summary.aBar.toArray } - val abBar = if (singular) { - val abBarTemp = summary.abBar.values + val abBar = if (isSingular) { + val abBarTemp = summary.abBar.toArray for (i <- nzVarIndex) yield {abBarTemp(i)} } else { - summary.abBar.values + summary.abBar.toArray } // NOTE: aaBar represents upper triangular part of A^T.A matrix in column major order. // We need to drop columns and rows from A^T.A corresponding to the features which have // zero variance. The following logic removes elements from aaBar corresponding to zerp // variance which effectively removes columns and rows from A^T.A. - val aaBar = if (singular) { - val aaBarTemp = summary.aaBar.values + val aaBar = if (isSingular) { + val aaBarTemp = summary.aaBar.toArray (for { col <- 0 until summary.k row <- 0 to col - if aVarRaw(col) != 0 && aVarRaw(row) != 0 } yield + if aVarRaw(col) != 0.0 && aVarRaw(row) != 0.0 } yield aaBarTemp(row + col * (col + 1) / 2)).toArray } else { - summary.aaBar.values + summary.aaBar.toArray } // add regularization to diagonals @@ -161,7 +157,7 @@ private[ml] class WeightedLeastSquares( if (standardizeFeatures) { lambda *= aVar(j - 2) } - if (standardizeLabel && bStd != 0) { + if (standardizeLabel && bStd != 0.0) { lambda /= bStd } aaBar(i) += lambda @@ -191,7 +187,7 @@ private[ml] class WeightedLeastSquares( val aaInvDiag = (1 to k).map { i => aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray - val (coefficients, diagInvAtWA) = if (singular) { + val (coefficients, diagInvAtWA) = if (isSingular) { // if there are constant features in the data, we need to add zeros for the coefficients // for these features. val coefTemp = Array.ofDim[Double](summary.k)