Skip to content

Commit f8782a4

Browse files
committed
aft_lir_lor
rename param add comment
1 parent a3d2954 commit f8782a4

File tree

12 files changed

+170
-329
lines changed

12 files changed

+170
-329
lines changed

mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala

Lines changed: 29 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.apache.spark.util.VersionUtils
5050
private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
5151
with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
5252
with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
53-
with HasBlockSize {
53+
with HasMaxBlockSizeInMB {
5454

5555
import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames
5656

@@ -245,7 +245,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas
245245

246246
setDefault(regParam -> 0.0, elasticNetParam -> 0.0, maxIter -> 100, tol -> 1E-6,
247247
fitIntercept -> true, family -> "auto", standardization -> true, threshold -> 0.5,
248-
aggregationDepth -> 2, blockSize -> 1)
248+
aggregationDepth -> 2, maxBlockSizeInMB -> 0.0)
249249

250250
protected def usingBoundConstrainedOptimization: Boolean = {
251251
isSet(lowerBoundsOnCoefficients) || isSet(upperBoundsOnCoefficients) ||
@@ -426,22 +426,13 @@ class LogisticRegression @Since("1.2.0") (
426426
def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)
427427

428428
/**
429-
* Set block size for stacking input data in matrices.
430-
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
431-
* If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
432-
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
433-
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
434-
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
435-
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
436-
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
437-
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
438-
* performance regression.
439-
* Default is 1.
429+
* Sets the value of param [[maxBlockSizeInMB]].
430+
* Default is 0.0.
440431
*
441432
* @group expertSetParam
442433
*/
443434
@Since("3.1.0")
444-
def setBlockSize(value: Int): this.type = set(blockSize, value)
435+
def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value)
445436

446437
private def assertBoundConstrainedOptimizationParamsValid(
447438
numCoefficientSets: Int,
@@ -495,31 +486,24 @@ class LogisticRegression @Since("1.2.0") (
495486
this
496487
}
497488

498-
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
499-
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
500-
train(dataset, handlePersistence)
501-
}
502-
503489
protected[spark] def train(
504-
dataset: Dataset[_],
505-
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
490+
dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
506491
instr.logPipelineStage(this)
507492
instr.logDataset(dataset)
508493
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
509494
probabilityCol, regParam, elasticNetParam, standardization, threshold, thresholds, maxIter,
510-
tol, fitIntercept, blockSize)
495+
tol, fitIntercept, maxBlockSizeInMB)
496+
497+
if (dataset.storageLevel != StorageLevel.NONE) {
498+
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
499+
s"then cached during training. Be careful of double caching!")
500+
}
511501

512502
val instances = extractInstances(dataset)
513503
.setName("training instances")
514504

515-
if (handlePersistence && $(blockSize) == 1) {
516-
instances.persist(StorageLevel.MEMORY_AND_DISK)
517-
}
518-
519-
var requestedMetrics = Seq("mean", "std", "count")
520-
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
521505
val (summarizer, labelSummarizer) = Summarizer
522-
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)
506+
.getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count"))
523507

524508
val numFeatures = summarizer.mean.size
525509
val histogram = labelSummarizer.histogram
@@ -547,14 +531,13 @@ class LogisticRegression @Since("1.2.0") (
547531
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
548532
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
549533
instr.logSumOfWeights(summarizer.weightSum)
550-
if ($(blockSize) > 1) {
551-
val scale = 1.0 / summarizer.count / numFeatures
552-
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
553-
instr.logNamedValue("sparsity", sparsity.toString)
554-
if (sparsity > 0.5) {
555-
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
556-
s"which may hurt performance in high-level BLAS.")
557-
}
534+
535+
var actualBlockSizeInMB = $(maxBlockSizeInMB)
536+
if (actualBlockSizeInMB == 0) {
537+
// TODO: for Multinomial logistic regression, take numClasses into account
538+
actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB
539+
require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0")
540+
instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString)
558541
}
559542

560543
val isMultinomial = checkMultinomial(numClasses)
@@ -584,7 +567,6 @@ class LogisticRegression @Since("1.2.0") (
584567
} else {
585568
Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity)
586569
}
587-
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
588570
return createModel(dataset, numClasses, coefMatrix, interceptVec, Array(0.0))
589571
}
590572

@@ -636,14 +618,9 @@ class LogisticRegression @Since("1.2.0") (
636618
Note that the intercept in scaled space and original space is the same;
637619
as a result, no scaling is needed.
638620
*/
639-
val (allCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
640-
trainOnRows(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix,
641-
regularization, optimizer)
642-
} else {
643-
trainOnBlocks(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix,
644-
regularization, optimizer)
645-
}
646-
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
621+
val (allCoefficients, objectiveHistory) =
622+
trainImpl(instances, actualBlockSizeInMB, featuresStd, numClasses,
623+
initialCoefWithInterceptMatrix, regularization, optimizer)
647624

648625
if (allCoefficients == null) {
649626
val msg = s"${optimizer.getClass.getName} failed."
@@ -949,40 +926,9 @@ class LogisticRegression @Since("1.2.0") (
949926
initialCoefWithInterceptMatrix
950927
}
951928

952-
private def trainOnRows(
953-
instances: RDD[Instance],
954-
featuresStd: Array[Double],
955-
numClasses: Int,
956-
initialCoefWithInterceptMatrix: Matrix,
957-
regularization: Option[L2Regularization],
958-
optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = {
959-
val bcFeaturesStd = instances.context.broadcast(featuresStd)
960-
val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
961-
checkMultinomial(numClasses))(_)
962-
963-
val costFun = new RDDLossFunction(instances, getAggregatorFunc,
964-
regularization, $(aggregationDepth))
965-
val states = optimizer.iterations(new CachedDiffFunction(costFun),
966-
new BDV[Double](initialCoefWithInterceptMatrix.toArray))
967-
968-
/*
969-
Note that in Logistic Regression, the objective history (loss + regularization)
970-
is log-likelihood which is invariant under feature standardization. As a result,
971-
the objective history from optimizer is the same as the one in the original space.
972-
*/
973-
val arrayBuilder = mutable.ArrayBuilder.make[Double]
974-
var state: optimizer.State = null
975-
while (states.hasNext) {
976-
state = states.next()
977-
arrayBuilder += state.adjustedValue
978-
}
979-
bcFeaturesStd.destroy()
980-
981-
(if (state == null) null else state.x.toArray, arrayBuilder.result)
982-
}
983-
984-
private def trainOnBlocks(
929+
private def trainImpl(
985930
instances: RDD[Instance],
931+
actualBlockSizeInMB: Double,
986932
featuresStd: Array[Double],
987933
numClasses: Int,
988934
initialCoefWithInterceptMatrix: Matrix,
@@ -996,9 +942,11 @@ class LogisticRegression @Since("1.2.0") (
996942
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
997943
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
998944
}
999-
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
945+
946+
val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
947+
val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
1000948
.persist(StorageLevel.MEMORY_AND_DISK)
1001-
.setName(s"training blocks (blockSize=${$(blockSize)})")
949+
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")
1002950

1003951
val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept),
1004952
checkMultinomial(numClasses))(_)

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.ml.optim.aggregator
1919

2020
import org.apache.spark.broadcast.Broadcast
21+
import org.apache.spark.ml.feature._
2122
import org.apache.spark.ml.linalg._
22-
import org.apache.spark.ml.regression.AFTPoint
2323

2424
/**
2525
* AFTAggregator computes the gradient and loss for a AFT loss function,
@@ -108,26 +108,26 @@ import org.apache.spark.ml.regression.AFTPoint
108108
private[ml] class AFTAggregator(
109109
bcFeaturesStd: Broadcast[Array[Double]],
110110
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
111-
extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] {
111+
extends DifferentiableLossAggregator[Instance, AFTAggregator] {
112112

113113
protected override val dim: Int = bcCoefficients.value.size
114114

115115
/**
116116
* Add a new training data to this AFTAggregator, and update the loss and gradient
117117
* of the objective function.
118118
*
119-
* @param data The AFTPoint representation for one data point to be added into this aggregator.
119+
* @param data The Instance representation for one data point to be added into this aggregator.
120120
* @return This AFTAggregator object.
121121
*/
122-
def add(data: AFTPoint): this.type = {
122+
def add(data: Instance): this.type = {
123123
val coefficients = bcCoefficients.value.toArray
124124
val intercept = coefficients(dim - 2)
125125
// sigma is the scale parameter of the AFT model
126126
val sigma = math.exp(coefficients(dim - 1))
127127

128128
val xi = data.features
129129
val ti = data.label
130-
val delta = data.censor
130+
val delta = data.weight
131131

132132
require(ti > 0.0, "The lifetime or label should be greater than 0.")
133133

@@ -176,7 +176,7 @@ private[ml] class AFTAggregator(
176176
*/
177177
private[ml] class BlockAFTAggregator(
178178
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
179-
extends DifferentiableLossAggregator[(Matrix, Array[Double], Array[Double]),
179+
extends DifferentiableLossAggregator[InstanceBlock,
180180
BlockAFTAggregator] {
181181

182182
protected override val dim: Int = bcCoefficients.value.size
@@ -196,8 +196,9 @@ private[ml] class BlockAFTAggregator(
196196
*
197197
* @return This BlockAFTAggregator object.
198198
*/
199-
def add(block: (Matrix, Array[Double], Array[Double])): this.type = {
200-
val (matrix, labels, censors) = block
199+
def add(block: InstanceBlock): this.type = {
200+
// here use Instance.weight to store censor for convenience
201+
val (matrix, labels, censors) = (block.matrix, block.labels, block.weightIter.toArray)
201202
require(matrix.isTransposed)
202203
require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " +
203204
s"instance. Expecting $numFeatures but got ${matrix.numCols}.")

0 commit comments

Comments
 (0)