Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR
* This binary classifier optimizes the Hinge Loss using the OWLQN optimizer.
* Only supports L2 regularization currently.
*
* Since 3.1.0, it supports stacking instances into blocks and using GEMV for
* better performance.
* The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default.
*
*/
@Since("2.2.0")
class LinearSVC @Since("2.2.0") (
Expand Down Expand Up @@ -154,7 +158,7 @@ class LinearSVC @Since("2.2.0") (

/**
* Sets the value of param [[maxBlockSizeInMB]].
* Default is 0.0.
* Default is 0.0, then 1.0 MB will be chosen.
*
* @group expertSetParam
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.util.VersionUtils
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
with HasBlockSize {
with HasMaxBlockSizeInMB {

import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames

Expand Down Expand Up @@ -245,7 +245,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas

setDefault(regParam -> 0.0, elasticNetParam -> 0.0, maxIter -> 100, tol -> 1E-6,
fitIntercept -> true, family -> "auto", standardization -> true, threshold -> 0.5,
aggregationDepth -> 2, blockSize -> 1)
aggregationDepth -> 2, maxBlockSizeInMB -> 0.0)

protected def usingBoundConstrainedOptimization: Boolean = {
isSet(lowerBoundsOnCoefficients) || isSet(upperBoundsOnCoefficients) ||
Expand Down Expand Up @@ -276,6 +276,10 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas
*
* This class supports fitting traditional logistic regression model by LBFGS/OWLQN and
* bound (box) constrained logistic regression model by LBFGSB.
*
* Since 3.1.0, it supports stacking instances into blocks and using GEMV/GEMM for
* better performance.
* The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default.
*/
@Since("1.2.0")
class LogisticRegression @Since("1.2.0") (
Expand Down Expand Up @@ -426,22 +430,13 @@ class LogisticRegression @Since("1.2.0") (
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)

/**
* Set block size for stacking input data in matrices.
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
* If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
* Sets the value of param [[maxBlockSizeInMB]].
* Default is 0.0, then 1.0 MB will be chosen.
*
* @group expertSetParam
*/
@Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value)

private def assertBoundConstrainedOptimizationParamsValid(
numCoefficientSets: Int,
Expand Down Expand Up @@ -495,31 +490,24 @@ class LogisticRegression @Since("1.2.0") (
this
}

override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}

protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
probabilityCol, regParam, elasticNetParam, standardization, threshold, thresholds, maxIter,
tol, fitIntercept, blockSize)
tol, fitIntercept, maxBlockSizeInMB)

if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
}

val instances = extractInstances(dataset)
.setName("training instances")

if (handlePersistence && $(blockSize) == 1) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)
.getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count"))

val numFeatures = summarizer.mean.size
val histogram = labelSummarizer.histogram
Expand Down Expand Up @@ -547,14 +535,13 @@ class LogisticRegression @Since("1.2.0") (
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
val scale = 1.0 / summarizer.count / numFeatures
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
instr.logNamedValue("sparsity", sparsity.toString)
if (sparsity > 0.5) {
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
s"which may hurt performance in high-level BLAS.")
}

var actualBlockSizeInMB = $(maxBlockSizeInMB)
if (actualBlockSizeInMB == 0) {
// TODO: for Multinomial logistic regression, take numClasses into account
actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB
require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0")
instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString)
}

val isMultinomial = checkMultinomial(numClasses)
Expand Down Expand Up @@ -584,7 +571,6 @@ class LogisticRegression @Since("1.2.0") (
} else {
Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
return createModel(dataset, numClasses, coefMatrix, interceptVec, Array(0.0))
}

Expand Down Expand Up @@ -636,14 +622,9 @@ class LogisticRegression @Since("1.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val (allCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
trainOnRows(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix,
regularization, optimizer)
} else {
trainOnBlocks(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix,
regularization, optimizer)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
val (allCoefficients, objectiveHistory) =
trainImpl(instances, actualBlockSizeInMB, featuresStd, numClasses,
initialCoefWithInterceptMatrix, regularization, optimizer)

if (allCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -949,40 +930,9 @@ class LogisticRegression @Since("1.2.0") (
initialCoefWithInterceptMatrix
}

private def trainOnRows(
instances: RDD[Instance],
featuresStd: Array[Double],
numClasses: Int,
initialCoefWithInterceptMatrix: Matrix,
regularization: Option[L2Regularization],
optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = {
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
checkMultinomial(numClasses))(_)

val costFun = new RDDLossFunction(instances, getAggregatorFunc,
regularization, $(aggregationDepth))
val states = optimizer.iterations(new CachedDiffFunction(costFun),
new BDV[Double](initialCoefWithInterceptMatrix.toArray))

/*
Note that in Logistic Regression, the objective history (loss + regularization)
is log-likelihood which is invariant under feature standardization. As a result,
the objective history from optimizer is the same as the one in the original space.
*/
val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
bcFeaturesStd.destroy()

(if (state == null) null else state.x.toArray, arrayBuilder.result)
}

private def trainOnBlocks(
private def trainImpl(
instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double],
numClasses: Int,
initialCoefWithInterceptMatrix: Matrix,
Expand All @@ -996,9 +946,11 @@ class LogisticRegression @Since("1.2.0") (
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))

val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSize=${$(blockSize)})")
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")

val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept),
checkMultinomial(numClasses))(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.ml.optim.aggregator

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.regression.AFTPoint

/**
* AFTAggregator computes the gradient and loss for a AFT loss function,
Expand Down Expand Up @@ -108,26 +108,26 @@ import org.apache.spark.ml.regression.AFTPoint
private[ml] class AFTAggregator(
bcFeaturesStd: Broadcast[Array[Double]],
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] {
extends DifferentiableLossAggregator[Instance, AFTAggregator] {

protected override val dim: Int = bcCoefficients.value.size

/**
* Add a new training data to this AFTAggregator, and update the loss and gradient
* of the objective function.
*
* @param data The AFTPoint representation for one data point to be added into this aggregator.
* @param data The Instance representation for one data point to be added into this aggregator.
* @return This AFTAggregator object.
*/
def add(data: AFTPoint): this.type = {
def add(data: Instance): this.type = {
val coefficients = bcCoefficients.value.toArray
val intercept = coefficients(dim - 2)
// sigma is the scale parameter of the AFT model
val sigma = math.exp(coefficients(dim - 1))

val xi = data.features
val ti = data.label
val delta = data.censor
val delta = data.weight

require(ti > 0.0, "The lifetime or label should be greater than 0.")

Expand Down Expand Up @@ -176,7 +176,7 @@ private[ml] class AFTAggregator(
*/
private[ml] class BlockAFTAggregator(
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[(Matrix, Array[Double], Array[Double]),
extends DifferentiableLossAggregator[InstanceBlock,
BlockAFTAggregator] {

protected override val dim: Int = bcCoefficients.value.size
Expand All @@ -196,16 +196,13 @@ private[ml] class BlockAFTAggregator(
*
* @return This BlockAFTAggregator object.
*/
def add(block: (Matrix, Array[Double], Array[Double])): this.type = {
val (matrix, labels, censors) = block
require(matrix.isTransposed)
require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${matrix.numCols}.")
require(labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.")

val size = matrix.numRows
require(labels.length == size && censors.length == size)
def add(block: InstanceBlock): this.type = {
require(block.matrix.isTransposed)
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
require(block.labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.")

val size = block.size
val intercept = coefficientsArray(dim - 2)
// sigma is the scale parameter of the AFT model
val sigma = math.exp(coefficientsArray(dim - 1))
Expand All @@ -216,26 +213,30 @@ private[ml] class BlockAFTAggregator(
} else {
Vectors.zeros(size).toDense
}
BLAS.gemv(1.0, matrix, linear, 1.0, vec)
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)

// in-place convert margins to gradient scales
// then, vec represents gradient scales
var localLossSum = 0.0
var i = 0
var sigmaGradSum = 0.0
while (i < size) {
val ti = labels(i)
val delta = censors(i)
val ti = block.getLabel(i)
// here use Instance.weight to store censor for convenience
val delta = block.getWeight(i)
val margin = vec(i)
val epsilon = (math.log(ti) - margin) / sigma
val expEpsilon = math.exp(epsilon)
lossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon
localLossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon
val multiplier = (delta - expEpsilon) / sigma
vec.values(i) = multiplier
sigmaGradSum += delta + multiplier * sigma * epsilon
i += 1
}
lossSum += localLossSum
weightSum += size

matrix match {
block.matrix match {
case dm: DenseMatrix =>
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
vec.values, 1, 1.0, gradientSumArray, 1)
Expand All @@ -249,7 +250,6 @@ private[ml] class BlockAFTAggregator(

if (fitIntercept) gradientSumArray(dim - 2) += vec.values.sum
gradientSumArray(dim - 1) += sigmaGradSum
weightSum += size

this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,26 @@ private[ml] class BlockHingeAggregator(

// in-place convert dotProducts to gradient scales
// then, vec represents gradient scales
var localLossSum = 0.0
var i = 0
while (i < size) {
val weight = block.getWeight(i)
if (weight > 0) {
weightSum += weight
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val label = block.getLabel(i)
val labelScaled = label + label - 1.0
val loss = (1.0 - labelScaled * vec(i)) * weight
if (loss > 0) {
lossSum += loss
localLossSum += loss
val gradScale = -labelScaled * weight
vec.values(i) = gradScale
} else { vec.values(i) = 0.0 }
} else { vec.values(i) = 0.0 }
i += 1
}
lossSum += localLossSum
weightSum += block.weightIter.sum

// predictions are all correct, no gradient signal
if (vec.values.forall(_ == 0)) return this
Expand Down
Loading