Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,16 @@ private[ml] class WeightedLeastSquares(
val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_))
summary.validate()
logInfo(s"Number of instances: ${summary.count}.")
val k = if (fitIntercept) summary.k + 1 else summary.k
val triK = summary.triK
val wSum = summary.wSum
val bBar = summary.bBar
val bStd = summary.bStd
val aBar = summary.aBar
val aVar = summary.aVar
val abBar = summary.abBar
val aaBar = summary.aaBar
val aaValues = aaBar.values

if (bStd == 0) {
if (fitIntercept) {
logWarning(s"The standard deviation of the label is zero, so the coefficients will be " +
s"zeros and the intercept will be the mean of the label; as a result, " +
s"training is not needed.")
val coefficients = new DenseVector(Array.ofDim(k-1))
val coefficients = new DenseVector(Array.ofDim(summary.k))
val intercept = bBar
val diagInvAtWA = new DenseVector(Array(0D))
return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA)
Expand All @@ -108,6 +101,53 @@ private[ml] class WeightedLeastSquares(
"Consider setting fitIntercept=true.")
}
}
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this comment adhere to the style like here?

* If more than one of the features in the data are constant (i.e. data matrix has constant
* columns), then A^T.A is no longer positive definite and Cholesky decomposition fails
* (because the normal equation does not have a solution).
* In order to find a solution, we need to drop constant columns from the data matrix. Or,
* we can drop corresponding column and row from A^T.A matrix.
* Once we drop rows/columns from A^T.A matrix, the Cholesky decomposition will produce
* correct coefficients. But, for the final result, we need to add zeros to the list of
* coefficients corresponding to the constant features.
*/
val aVarRaw = summary.aVar.toArray
// this will keep track of features to keep in the model, and remove
// features with zero variance.
val nzVarIndex = aVarRaw.zipWithIndex.filter(_._1 != 0.0).map(_._2)
val nz = nzVarIndex.length
// if there are features with zero variance, then ATA is not positive definite, and we need to
// keep track of that.
val isSingular = summary.k > nz
val k = if (fitIntercept) nz + 1 else nz
val triK = nz * (nz + 1) / 2

val aVar = for (i <- nzVarIndex) yield aVarRaw(i)
val aBar = if (isSingular) {
val aBarTemp = summary.aBar.toArray
for (i <- nzVarIndex) yield aBarTemp(i)
} else {
summary.aBar.toArray
}
val abBar = if (isSingular) {
val abBarTemp = summary.abBar.toArray
for (i <- nzVarIndex) yield {abBarTemp(i)}
} else {
summary.abBar.toArray
}
// NOTE: aaBar represents upper triangular part of A^T.A matrix in column major order.
// We need to drop columns and rows from A^T.A corresponding to the features which have
// zero variance. The following logic removes elements from aaBar corresponding to zerp
// variance which effectively removes columns and rows from A^T.A.
val aaBar = if (isSingular) {
val aaBarTemp = summary.aaBar.toArray
(for { col <- 0 until summary.k
row <- 0 to col
if aVarRaw(col) != 0.0 && aVarRaw(row) != 0.0 } yield
aaBarTemp(row + col * (col + 1) / 2)).toArray
} else {
summary.aaBar.toArray
}

// add regularization to diagonals
var i = 0
Expand All @@ -117,37 +157,50 @@ private[ml] class WeightedLeastSquares(
if (standardizeFeatures) {
lambda *= aVar(j - 2)
}
if (standardizeLabel && bStd != 0) {
if (standardizeLabel && bStd != 0.0) {
lambda /= bStd
}
aaValues(i) += lambda
aaBar(i) += lambda
i += j
j += 1
}

val aa = if (fitIntercept) {
Array.concat(aaBar.values, aBar.values, Array(1.0))
Array.concat(aaBar, aBar, Array(1.0))
} else {
aaBar.values
aaBar
}
val ab = if (fitIntercept) {
Array.concat(abBar.values, Array(bBar))
Array.concat(abBar, Array(bBar))
} else {
abBar.values
abBar
}

val x = CholeskyDecomposition.solve(aa, ab)

val (coefs, intercept) = if (fitIntercept) {
(x.init, x.last)
} else {
(x, 0.0)
}
val aaInv = CholeskyDecomposition.inverse(aa, k)

// aaInv is a packed upper triangular matrix, here we get all elements on diagonal
val diagInvAtWA = new DenseVector((1 to k).map { i =>
aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray)
val aaInvDiag = (1 to k).map { i =>
aaInv(i + (i - 1) * i / 2 - 1) / wSum }.toArray

val (coefficients, intercept) = if (fitIntercept) {
(new DenseVector(x.slice(0, x.length - 1)), x.last)
val (coefficients, diagInvAtWA) = if (isSingular) {
// if there are constant features in the data, we need to add zeros for the coefficients
// for these features.
val coefTemp = Array.ofDim[Double](summary.k)
val diagTemp = Array.ofDim[Double](summary.k)
var i = 0
while (i < nz) {
coefTemp(nzVarIndex(i)) = coefs(i)
diagTemp(nzVarIndex(i)) = aaInvDiag(i)
i += 1
}
(new DenseVector(coefTemp), new DenseVector(diagTemp))
} else {
(new DenseVector(x), 0.0)
(new DenseVector(coefs), new DenseVector(aaInvDiag))
}

new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class LinearRegressionSuite
@transient var datasetWithWeight: DataFrame = _
@transient var datasetWithWeightConstantLabel: DataFrame = _
@transient var datasetWithWeightZeroLabel: DataFrame = _
@transient var datasetWithWeightConstantFeatures: DataFrame = _

/*
In `LinearRegressionSuite`, we will make sure that the model trained by SparkML
Expand Down Expand Up @@ -117,6 +118,21 @@ class LinearRegressionSuite
Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)),
Instance(0.0, 4.0, Vectors.dense(3.0, 13.0))
), 2))

/*
R code:
A <- matrix(c(0, 1, 2, 3, 9, 9, 9, 9, 5, 7, 11, 13, 6, 6, 6, 6), 4, 4)
b <- c(17, 19, 23, 29)
w <- c(1, 2, 3, 4)
df.const.feature <- as.data.frame(cbind(A, b))
*/
datasetWithWeightConstantFeatures = sqlContext.createDataFrame(
sc.parallelize(Seq(
Instance(17.0, 1.0, Vectors.dense(0.0, 9.0, 5.0, 6.0)),
Instance(19.0, 2.0, Vectors.dense(1.0, 9.0, 7.0, 6.0)),
Instance(23.0, 3.0, Vectors.dense(2.0, 9.0, 11.0, 6.0)),
Instance(29.0, 4.0, Vectors.dense(3.0, 9.0, 13.0, 6.0))
), 2))
}

test("params") {
Expand Down Expand Up @@ -622,6 +638,36 @@ class LinearRegressionSuite
}
}

test("linear regression model with constant features") {
/*
R code:
for (intercept in c(TRUE, FALSE)) {
model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0, alpha=0,
thresh=1E-20)
print(as.vector(coef(model)))
}
[1] 18.08 6.08 0.00 -0.60 0.00
[1] 0.000000 -3.727121 0.000000 3.009983 0.000000
*/
val expected = Seq(
Vectors.dense(18.08, 6.08, 0.0, -0.60, 0.0),
Vectors.dense(0.0, -3.727121, 0.0, 3.009983, 0.0))
Seq("auto", "l-bfgs", "normal").foreach { solver =>
var idx = 0
for (fitIntercept <- Seq(true, false)) {
val model = new LinearRegression()
.setFitIntercept(fitIntercept)
.setWeightCol("weight")
.setSolver(solver)
.fit(datasetWithWeightConstantFeatures)
val actual = Vectors.dense(model.intercept, model.coefficients(0),
model.coefficients(1), model.coefficients(2), model.coefficients(3))
assert(actual ~== expected(idx) absTol 1e-4)
idx += 1
}
}
}

test("regularized linear regression through origin with constant label") {
// The problem is ill-defined if fitIntercept=false, regParam is non-zero.
// An exception is thrown in this case.
Expand Down