Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,20 @@ private[spark] object Utils {
i * (i + 1) / 2 + j
}
}

/**
* When `x` is positive and large, computing `math.log(1 + math.exp(x))` will lead to arithmetic
* overflow. This will happen when `x > 709.78` which is not a very large number.
* It can be addressed by rewriting the formula into `x + math.log1p(math.exp(-x))`
* when `x` is positive.
* @param x a floating-point value as input.
* @return the result of `math.log(1 + math.exp(x))`.
*/
def log1pExp(x: Double): Double = {
if (x > 0) {
x + math.log1p(math.exp(-x))
} else {
math.log1p(math.exp(x))
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ private[ml] class BlockHingeAggregator(

@transient private lazy val linear = {
val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray
Vectors.dense(linear).toDense
Vectors.dense(linear)
}

/**
* Add a new training instance block to this HingeAggregator, and update the loss and gradient
* of the objective function.
* Add a new training instance block to this BlockHingeAggregator, and update the loss and
* gradient of the objective function.
*
* @param block The InstanceBlock to be added.
* @return This HingeAggregator object.
* @return This BlockHingeAggregator object.
*/
def add(block: InstanceBlock): this.type = {
require(block.matrix.isTransposed)
Expand All @@ -163,7 +163,6 @@ private[ml] class BlockHingeAggregator(
// in-place convert dotProducts to gradient scales
// then, vec represents gradient scales
var i = 0
var interceptGradSum = 0.0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
Expand All @@ -172,12 +171,11 @@ private[ml] class BlockHingeAggregator(
// Therefore the gradient is -(2y - 1)*x
val label = block.getLabel(i)
val labelScaled = label + label - 1.0
val loss = (1.0 - labelScaled * vec.values(i)) * weight
val loss = (1.0 - labelScaled * vec(i)) * weight
if (loss > 0) {
lossSum += loss
val gradScale = -labelScaled * weight
vec.values(i) = gradScale
if (fitIntercept) interceptGradSum += gradScale
} else { vec.values(i) = 0.0 }
} else { vec.values(i) = 0.0 }
i += 1
Expand All @@ -190,20 +188,20 @@ private[ml] class BlockHingeAggregator(
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
vec.values, 1, 1.0, gradientSumArray, 1)
if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum

case sm: SparseMatrix if fitIntercept =>
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
gradientSumArray, 1)
gradientSumArray(numFeatures) += interceptGradSum

case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum

this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{DenseVector, Vector}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.impl.Utils
import org.apache.spark.ml.linalg._

/**
* LogisticAggregator computes the gradient and loss for binary or multinomial logistic (softmax)
Expand Down Expand Up @@ -203,7 +203,7 @@ private[ml] class LogisticAggregator(
s"in {1, 2} but found $numClasses.")
}

@transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match {
@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
s"got type ${bcCoefficients.value.getClass}.)")
Expand Down Expand Up @@ -247,9 +247,9 @@ private[ml] class LogisticAggregator(

if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * MLUtils.log1pExp(margin)
lossSum += weight * Utils.log1pExp(margin)
} else {
lossSum += weight * (MLUtils.log1pExp(margin) - margin)
lossSum += weight * (Utils.log1pExp(margin) - margin)
}
}

Expand Down Expand Up @@ -364,3 +364,237 @@ private[ml] class LogisticAggregator(
}
}
}


/**
* BlockLogisticAggregator computes the gradient and loss used in Logistic classification
* for blocks in sparse or dense matrix in an online fashion.
*
* Two BlockLogisticAggregators can be merged together to have a summary of loss and gradient of
* the corresponding joint dataset.
*
* NOTE: The feature values are expected to be standardized before computation.
*
* @param bcCoefficients The coefficients corresponding to the features.
* @param fitIntercept Whether to fit an intercept term.
*/
private[ml] class BlockLogisticAggregator(
numFeatures: Int,
numClasses: Int,
fitIntercept: Boolean,
multinomial: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[InstanceBlock, BlockLogisticAggregator] with Logging {

if (multinomial && numClasses <= 2) {
logInfo(s"Multinomial logistic regression for binary classification yields separate " +
s"coefficients for positive and negative classes. When no regularization is applied, the" +
s"result will be effectively the same as binary logistic regression. When regularization" +
s"is applied, multinomial loss will produce a result different from binary loss.")
}

private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
private val coefficientSize = bcCoefficients.value.size
protected override val dim: Int = coefficientSize

if (multinomial) {
require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " +
s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize")
} else {
require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " +
s"coefficients but got $coefficientSize")
require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " +
s"in {1, 2} but found $numClasses.")
}

@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " +
s"got type ${bcCoefficients.value.getClass}.)")
}

@transient private lazy val binaryLinear = (multinomial, fitIntercept) match {
case (false, true) => Vectors.dense(coefficientsArray.take(numFeatures))
case (false, false) => Vectors.dense(coefficientsArray)
case _ => null
}

@transient private lazy val multinomialLinear = (multinomial, fitIntercept) match {
case (true, true) =>
Matrices.dense(numClasses, numFeatures,
coefficientsArray.take(numClasses * numFeatures)).toDense
case (true, false) =>
Matrices.dense(numClasses, numFeatures, coefficientsArray).toDense
case _ => null
}

/**
* Add a new training instance block to this BlockLogisticAggregator, and update the loss and
* gradient of the objective function.
*
* @param block The instance block of data point to be added.
* @return This BlockLogisticAggregator object.
*/
def add(block: InstanceBlock): this.type = {
require(block.matrix.isTransposed)
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.weightIter.forall(_ >= 0),
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")

if (block.weightIter.forall(_ == 0)) return this

if (multinomial) {
multinomialUpdateInPlace(block)
} else {
binaryUpdateInPlace(block)
}

this
}

/** Update gradient and loss using binary loss function. */
private def binaryUpdateInPlace(block: InstanceBlock): Unit = {
val size = block.size

// vec here represents margins or negative dotProducts
val vec = if (fitIntercept) {
Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense
} else {
Vectors.zeros(size).toDense
}
BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec)

// in-place convert margins to multiplier
// then, vec represents multiplier
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)
val margin = vec(i)
if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
lossSum += weight * Utils.log1pExp(margin)
} else {
lossSum += weight * (Utils.log1pExp(margin) - margin)
}
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
vec.values(i) = multiplier
} else { vec.values(i) = 0.0 }
i += 1
}

// predictions are all correct, no gradient signal
if (vec.values.forall(_ == 0)) return

block.matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
vec.values, 1, 1.0, gradientSumArray, 1)

case sm: SparseMatrix if fitIntercept =>
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
gradientSumArray, 1)

case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
}

/** Update gradient and loss using multinomial (softmax) loss function. */
private def multinomialUpdateInPlace(block: InstanceBlock): Unit = {
val size = block.size

// mat here represents margins, shape: S X C
val mat = DenseMatrix.zeros(size, numClasses)
if (fitIntercept) {
val offset = numClasses * numFeatures
var j = 0
while (j < numClasses) {
val intercept = coefficientsArray(offset + j)
var i = 0
while (i < size) { mat.update(i, j, intercept); i += 1 }
j += 1
}
}
BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat)

// in-place convert margins to multipliers
// then, mat represents multipliers
var i = 0
val tmp = Array.ofDim[Double](numClasses)
val interceptGradSumArr = if (fitIntercept) Array.ofDim[Double](numClasses) else null
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
val label = block.getLabel(i)

var maxMargin = Double.NegativeInfinity
var j = 0
while (j < numClasses) {
tmp(j) = mat(i, j)
maxMargin = math.max(maxMargin, tmp(j))
j += 1
}

// marginOfLabel is margins(label) in the formula
val marginOfLabel = tmp(label.toInt)

var sum = 0.0
j = 0
while (j < numClasses) {
if (maxMargin > 0) tmp(j) -= maxMargin
val exp = math.exp(tmp(j))
sum += exp
tmp(j) = exp
j += 1
}

j = 0
while (j < numClasses) {
val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0))
mat.update(i, j, multiplier)
if (fitIntercept) interceptGradSumArr(j) += multiplier
j += 1
}

if (maxMargin > 0) {
lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin)
} else {
lossSum += weight * (math.log(sum) - marginOfLabel)
}
} else {
var j = 0; while (j < numClasses) { mat.update(i, j, 0.0); j += 1 }
}
i += 1
}

// mat (multipliers): S X C, dense N
// mat.transpose (multipliers): C X S, dense T
// block.matrix: S X F, unknown type T
// gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense N
block.matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemm("T", "T", numClasses, numFeatures, size, 1.0,
mat.values, size, dm.values, numFeatures, 1.0, gradientSumArray, numClasses)

case sm: SparseMatrix =>
// linearGradSumMat = matrix.T X mat
val linearGradSumMat = DenseMatrix.zeros(numFeatures, numClasses)
BLAS.gemm(1.0, sm.transpose, mat, 0.0, linearGradSumMat)
linearGradSumMat.foreachActive { (i, j, v) => gradientSumArray(i * numClasses + j) += v }
}

if (fitIntercept) {
BLAS.getBLAS(numClasses).daxpy(numClasses, 1.0, interceptGradSumArr, 0, 1,
gradientSumArray, numClasses * numFeatures, 1)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,35 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
testProbClassificationModelSingleProbPrediction(mlorModel, smallMultinomialDataset)
}

test("LogisticRegression on blocks") {
for (dataset <- Seq(smallBinaryDataset, smallMultinomialDataset, binaryDataset,
multinomialDataset, multinomialDatasetWithZeroVar); fitIntercept <- Seq(true, false)) {
val mlor = new LogisticRegression()
.setFitIntercept(fitIntercept)
.setMaxIter(5)
.setFamily("multinomial")
val model = mlor.fit(dataset)
Seq(4, 16, 64).foreach { blockSize =>
val model2 = mlor.setBlockSize(blockSize).fit(dataset)
assert(model.interceptVector ~== model2.interceptVector relTol 1e-6)
assert(model.coefficientMatrix ~== model2.coefficientMatrix relTol 1e-6)
}
}

for (dataset <- Seq(smallBinaryDataset, binaryDataset); fitIntercept <- Seq(true, false)) {
val blor = new LogisticRegression()
.setFitIntercept(fitIntercept)
.setMaxIter(5)
.setFamily("binomial")
val model = blor.fit(dataset)
Seq(4, 16, 64).foreach { blockSize =>
val model2 = blor.setBlockSize(blockSize).fit(dataset)
assert(model.intercept ~== model2.intercept relTol 1e-6)
assert(model.coefficients ~== model2.coefficients relTol 1e-6)
}
}
}

test("coefficients and intercept methods") {
val mlr = new LogisticRegression().setMaxIter(1).setFamily("multinomial")
val mlrModel = mlr.fit(smallMultinomialDataset)
Expand Down
Loading