From 563cee9e8a2edd2a0f1025de531002488de1e63a Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 27 Apr 2020 17:16:56 +0800 Subject: [PATCH 1/4] init nit nit nit nit nit nit nit nit nit nit remove some transient lazy variables nit py py --- .../org/apache/spark/ml/impl/Utils.scala | 15 + .../classification/LogisticRegression.scala | 812 ++++++++++-------- .../optim/aggregator/LogisticAggregator.scala | 248 +++++- .../LogisticRegressionSuite.scala | 29 + .../aggregator/HingeAggregatorSuite.scala | 4 +- .../aggregator/LogisticAggregatorSuite.scala | 69 +- python/pyspark/ml/classification.py | 26 +- 7 files changed, 850 insertions(+), 353 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala index ee3e99c0a8a5..80738be41e0e 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala @@ -78,4 +78,19 @@ private[spark] object Utils { i * (i + 1) / 2 + j } } + + /** + * When `x` is positive and large, computing `math.log(1 + math.exp(x))` will lead to arithmetic + * overflow. This will happen when `x > 709.78` which is not a very large number. + * It can be addressed by rewriting the formula into `x + math.log1p(math.exp(-x))` when `x > 0`. + * @param x a floating-point value as input. + * @return the result of `math.log(1 + math.exp(x))`. + */ + def log1pExp(x: Double): Double = { + if (x > 0) { + x + math.log1p(math.exp(-x)) + } else { + math.log1p(math.exp(x)) + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 25678ea6a8f7..10cf96180090 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -22,14 +22,15 @@ import java.util.Locale import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} -import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB, OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, DiffFunction, FirstOrderMinimizer, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB, OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.optim.aggregator.LogisticAggregator +import org.apache.spark.ml.optim.aggregator._ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -38,6 +39,7 @@ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics} import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DataType, DoubleType, StructType} @@ -49,7 +51,8 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth + with HasBlockSize { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -429,6 +432,25 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -489,27 +511,26 @@ class LogisticRegression @Since("1.2.0") ( protected[spark] def train( dataset: Dataset[_], handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => - val instances = extractInstances(dataset) - - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol, - fitIntercept) + fitIntercept, blockSize) - val (summarizer, labelSummarizer) = - Summarizer.getClassificationSummarizers(instances, $(aggregationDepth)) + val instances = extractInstances(dataset).setName("training instances") + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logNumExamples(summarizer.count) - instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) - instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) - instr.logSumOfWeights(summarizer.weightSum) + val (summarizer, labelSummarizer) = if ($(blockSize) == 1) { + Summarizer.getClassificationSummarizers(instances, $(aggregationDepth)) + } else { + // instances will be standardized and converted to blocks, so no need to cache instances. + Summarizer.getClassificationSummarizers(instances, $(aggregationDepth), + Seq("mean", "std", "count", "numNonZeros")) + } + val numFeatures = summarizer.mean.size val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid - val numFeatures = summarizer.mean.size val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { @@ -520,15 +541,30 @@ class LogisticRegression @Since("1.2.0") ( case None => histogram.length } - val isMultinomial = getFamily.toLowerCase(Locale.ROOT) match { - case "binomial" => - require(numClasses == 1 || numClasses == 2, s"Binomial family only supports 1 or 2 " + - s"outcome classes but found $numClasses.") - false - case "multinomial" => true - case "auto" => numClasses > 2 - case other => throw new IllegalArgumentException(s"Unsupported family: $other") + if (numInvalid != 0) { + val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + + s"Found $numInvalid invalid labels." + instr.logError(msg) + throw new SparkException(msg) + } + + instr.logNumClasses(numClasses) + instr.logNumFeatures(numFeatures) + instr.logNumExamples(summarizer.count) + instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) + instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) + instr.logSumOfWeights(summarizer.weightSum) + if ($(blockSize) > 1) { + val scale = 1.0 / summarizer.count / numFeatures + val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum + instr.logNamedValue("sparsity", sparsity.toString) + if (sparsity > 0.5) { + instr.logWarning(s"sparsity of input dataset is $sparsity, " + + s"which may hurt performance in high-level BLAS.") + } } + + val isMultinomial = checkMultinomial(numClasses) val numCoefficientSets = if (isMultinomial) numClasses else 1 // Check params interaction is valid if fitting under bound constrained optimization. @@ -542,335 +578,156 @@ class LogisticRegression @Since("1.2.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } - instr.logNumClasses(numClasses) - instr.logNumFeatures(numFeatures) - - val (coefficientMatrix, interceptVector, objectiveHistory) = { - if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." - instr.logError(msg) - throw new SparkException(msg) - } - - val isConstantLabel = histogram.count(_ != 0.0) == 1 - - if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) { - instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " + - s"coefficients will be zeros. Training is not needed.") - val constantLabelIndex = Vectors.dense(histogram).argmax - val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures, - new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, Array.emptyDoubleArray, - isTransposed = true).compressed - val interceptVec = if (isMultinomial) { - Vectors.sparse(numClasses, Seq((constantLabelIndex, Double.PositiveInfinity))) - } else { - Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity) - } - (coefMatrix, interceptVec, Array.emptyDoubleArray) + val isConstantLabel = histogram.count(_ != 0.0) == 1 + if ($(fitIntercept) && isConstantLabel && !usingBoundConstrainedOptimization) { + instr.logWarning(s"All labels are the same value and fitIntercept=true, so the " + + s"coefficients will be zeros. Training is not needed.") + val constantLabelIndex = Vectors.dense(histogram).argmax + val coefMatrix = new SparseMatrix(numCoefficientSets, numFeatures, + new Array[Int](numCoefficientSets + 1), Array.emptyIntArray, Array.emptyDoubleArray, + isTransposed = true).compressed + val interceptVec = if (isMultinomial) { + Vectors.sparse(numClasses, Seq((constantLabelIndex, Double.PositiveInfinity))) } else { - if (!$(fitIntercept) && isConstantLabel) { - instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + - s"dangerous ground, so the algorithm may not converge.") - } - - val featuresMean = summarizer.mean.toArray - val featuresStd = summarizer.std.toArray - - if (!$(fitIntercept) && (0 until numFeatures).exists { i => - featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { - instr.logWarning("Fitting LogisticRegressionModel without intercept on dataset with " + - "constant nonzero column, Spark MLlib outputs zero coefficients for constant " + - "nonzero columns. This behavior is the same as R glmnet but different from LIBSVM.") - } - - val regParamL1 = $(elasticNetParam) * $(regParam) - val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) - - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), - multinomial = isMultinomial)(_) - val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { - featuresStd(j / numCoefficientSets) - } else { - 0.0 - } + Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity) + } + if (handlePersistence) instances.unpersist() + return createModel(dataset, numClasses, coefMatrix, interceptVec, Array.empty) + } - val regularization = if (regParamL2 != 0.0) { - val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures * numCoefficientSets - Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) - } else { - None - } + if (!$(fitIntercept) && isConstantLabel) { + instr.logWarning(s"All labels belong to a single class and fitIntercept=false. It's a " + + s"dangerous ground, so the algorithm may not converge.") + } - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, - $(aggregationDepth)) - - val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets - - val (lowerBounds, upperBounds): (Array[Double], Array[Double]) = { - if (usingBoundConstrainedOptimization) { - val lowerBounds = Array.fill[Double](numCoeffsPlusIntercepts)(Double.NegativeInfinity) - val upperBounds = Array.fill[Double](numCoeffsPlusIntercepts)(Double.PositiveInfinity) - val isSetLowerBoundsOnCoefficients = isSet(lowerBoundsOnCoefficients) - val isSetUpperBoundsOnCoefficients = isSet(upperBoundsOnCoefficients) - val isSetLowerBoundsOnIntercepts = isSet(lowerBoundsOnIntercepts) - val isSetUpperBoundsOnIntercepts = isSet(upperBoundsOnIntercepts) - - var i = 0 - while (i < numCoeffsPlusIntercepts) { - val coefficientSetIndex = i % numCoefficientSets - val featureIndex = i / numCoefficientSets - if (featureIndex < numFeatures) { - if (isSetLowerBoundsOnCoefficients) { - lowerBounds(i) = $(lowerBoundsOnCoefficients)( - coefficientSetIndex, featureIndex) * featuresStd(featureIndex) - } - if (isSetUpperBoundsOnCoefficients) { - upperBounds(i) = $(upperBoundsOnCoefficients)( - coefficientSetIndex, featureIndex) * featuresStd(featureIndex) - } - } else { - if (isSetLowerBoundsOnIntercepts) { - lowerBounds(i) = $(lowerBoundsOnIntercepts)(coefficientSetIndex) - } - if (isSetUpperBoundsOnIntercepts) { - upperBounds(i) = $(upperBoundsOnIntercepts)(coefficientSetIndex) - } - } - i += 1 - } - (lowerBounds, upperBounds) - } else { - (null, null) - } - } + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.std.toArray - val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { - if (lowerBounds != null && upperBounds != null) { - new BreezeLBFGSB( - BDV[Double](lowerBounds), BDV[Double](upperBounds), $(maxIter), 10, $(tol)) - } else { - new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) - } - } else { - val standardizationParam = $(standardization) - def regParamL1Fun = (index: Int) => { - // Remove the L1 penalization on the intercept - val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets - if (isIntercept) { - 0.0 - } else { - if (standardizationParam) { - regParamL1 - } else { - val featureIndex = index / numCoefficientSets - // If `standardization` is false, we still standardize the data - // to improve the rate of convergence; as a result, we have to - // perform this reverse standardization by penalizing each component - // differently to get effectively the same objective function when - // the training dataset is not standardized. - if (featuresStd(featureIndex) != 0.0) { - regParamL1 / featuresStd(featureIndex) - } else { - 0.0 - } - } - } - } - new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) - } - - /* - The coefficients are laid out in column major order during training. Here we initialize - a column major matrix of initial coefficients. - */ - val initialCoefWithInterceptMatrix = - Matrices.zeros(numCoefficientSets, numFeaturesPlusIntercept) - - val initialModelIsValid = optInitialModel match { - case Some(_initialModel) => - val providedCoefs = _initialModel.coefficientMatrix - val modelIsValid = (providedCoefs.numRows == numCoefficientSets) && - (providedCoefs.numCols == numFeatures) && - (_initialModel.interceptVector.size == numCoefficientSets) && - (_initialModel.getFitIntercept == $(fitIntercept)) - if (!modelIsValid) { - instr.logWarning(s"Initial coefficients will be ignored! Its dimensions " + - s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " + - s"expected size ($numCoefficientSets, $numFeatures)") - } - modelIsValid - case None => false - } + if (!$(fitIntercept) && (0 until numFeatures).exists { i => + featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { + instr.logWarning("Fitting LogisticRegressionModel without intercept on dataset with " + + "constant nonzero column, Spark MLlib outputs zero coefficients for constant " + + "nonzero columns. This behavior is the same as R glmnet but different from LIBSVM.") + } - if (initialModelIsValid) { - val providedCoef = optInitialModel.get.coefficientMatrix - providedCoef.foreachActive { (classIndex, featureIndex, value) => - // We need to scale the coefficients since they will be trained in the scaled space - initialCoefWithInterceptMatrix.update(classIndex, featureIndex, - value * featuresStd(featureIndex)) - } - if ($(fitIntercept)) { - optInitialModel.get.interceptVector.foreachNonZero { (classIndex, value) => - initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value) - } - } - } else if ($(fitIntercept) && isMultinomial) { - /* - For multinomial logistic regression, when we initialize the coefficients as zeros, - it will converge faster if we initialize the intercepts such that - it follows the distribution of the labels. - {{{ - P(1) = \exp(b_1) / Z - ... - P(K) = \exp(b_K) / Z - where Z = \sum_{k=1}^{K} \exp(b_k) - }}} - Since this doesn't have a unique solution, one of the solutions that satisfies the - above equations is - {{{ - \exp(b_k) = count_k * \exp(\lambda) - b_k = \log(count_k) * \lambda - }}} - \lambda is a free parameter, so choose the phase \lambda such that the - mean is centered. This yields - {{{ - b_k = \log(count_k) - b_k' = b_k - \mean(b_k) - }}} - */ - val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x)) - val rawMean = rawIntercepts.sum / rawIntercepts.length - rawIntercepts.indices.foreach { i => - initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) - } - } else if ($(fitIntercept)) { - /* - For binary logistic regression, when we initialize the coefficients as zeros, - it will converge faster if we initialize the intercept such that - it follows the distribution of the labels. - - {{{ - P(0) = 1 / (1 + \exp(b)), and - P(1) = \exp(b) / (1 + \exp(b)) - }}}, hence - {{{ - b = \log{P(1) / P(0)} = \log{count_1 / count_0} - }}} - */ - initialCoefWithInterceptMatrix.update(0, numFeatures, - math.log(histogram(1) / histogram(0))) - } + val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) + val regularization = if (regParamL2 != 0.0) { + val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { + featuresStd(j / numCoefficientSets) + } else 0.0 + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures * numCoefficientSets + Some(new L2Regularization(regParamL2, shouldApply, + if ($(standardization)) None else Some(getFeaturesStd))) + } else None + + val (lowerBounds, upperBounds) = createBounds(numClasses, numFeatures, featuresStd) + + val optimizer = createOptimizer(numClasses, numFeatures, featuresStd, + lowerBounds, upperBounds) + + /* + The coefficients are laid out in column major order during training. Here we initialize + a column major matrix of initial coefficients. + */ + val initialCoefWithInterceptMatrix = createInitCoefWithInterceptMatrix( + numClasses, numFeatures, histogram, featuresStd, lowerBounds, upperBounds, instr) + + /* + The coefficients are trained in the scaled space; we're converting them back to + the original space. + + Additionally, since the coefficients were laid out in column major order during training + to avoid extra computation, we convert them back to row major before passing them to the + model. + + Note that the intercept in scaled space and original space is the same; + as a result, no scaling is needed. + */ + val (allCoefficients, objectiveHistory) = if ($(blockSize) == 1) { + trainOnRows(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix, + regularization, optimizer) + } else { + trainOnBlocks(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix, + regularization, optimizer) + } + if (handlePersistence) instances.unpersist() - if (usingBoundConstrainedOptimization) { - // Make sure all initial values locate in the corresponding bound. - var i = 0 - while (i < numCoeffsPlusIntercepts) { - val coefficientSetIndex = i % numCoefficientSets - val featureIndex = i / numCoefficientSets - if (initialCoefWithInterceptMatrix(coefficientSetIndex, featureIndex) < lowerBounds(i)) - { - initialCoefWithInterceptMatrix.update( - coefficientSetIndex, featureIndex, lowerBounds(i)) - } else if ( - initialCoefWithInterceptMatrix(coefficientSetIndex, featureIndex) > upperBounds(i)) - { - initialCoefWithInterceptMatrix.update( - coefficientSetIndex, featureIndex, upperBounds(i)) - } - i += 1 - } - } + if (allCoefficients == null) { + val msg = s"${optimizer.getClass.getName} failed." + instr.logError(msg) + throw new SparkException(msg) + } - val states = optimizer.iterations(new CachedDiffFunction(costFun), - new BDV[Double](initialCoefWithInterceptMatrix.toArray)) - - /* - Note that in Logistic Regression, the objective history (loss + regularization) - is log-likelihood which is invariant under feature standardization. As a result, - the objective history from optimizer is the same as the one in the original space. - */ - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - bcFeaturesStd.destroy() + val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, + allCoefficients) + val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures, + new Array[Double](numCoefficientSets * numFeatures), isTransposed = true) + val interceptVec = if ($(fitIntercept) || !isMultinomial) { + Vectors.zeros(numCoefficientSets) + } else { + Vectors.sparse(numCoefficientSets, Seq.empty) + } + // separate intercepts and coefficients from the combined matrix + allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => + val isIntercept = $(fitIntercept) && (featureIndex == numFeatures) + if (!isIntercept && featuresStd(featureIndex) != 0.0) { + denseCoefficientMatrix.update(classIndex, featureIndex, + value / featuresStd(featureIndex)) + } + if (isIntercept) interceptVec.toArray(classIndex) = value + } - if (state == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) - } + if ($(regParam) == 0.0 && isMultinomial && !usingBoundConstrainedOptimization) { + /* + When no regularization is applied, the multinomial coefficients lack identifiability + because we do not use a pivot class. We can add any constant value to the coefficients + and get the same likelihood. So here, we choose the mean centered coefficients for + reproducibility. This method follows the approach in glmnet, described here: + + Friedman, et al. "Regularization Paths for Generalized Linear Models via + Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf + */ + val centers = Array.ofDim[Double](numFeatures) + denseCoefficientMatrix.foreachActive { case (i, j, v) => + centers(j) += v + } + centers.transform(_ / numCoefficientSets) + denseCoefficientMatrix.foreachActive { case (i, j, v) => + denseCoefficientMatrix.update(i, j, v - centers(j)) + } + } - /* - The coefficients are trained in the scaled space; we're converting them back to - the original space. - - Additionally, since the coefficients were laid out in column major order during training - to avoid extra computation, we convert them back to row major before passing them to the - model. - - Note that the intercept in scaled space and original space is the same; - as a result, no scaling is needed. - */ - val allCoefficients = state.x.toArray.clone() - val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, - allCoefficients) - val denseCoefficientMatrix = new DenseMatrix(numCoefficientSets, numFeatures, - new Array[Double](numCoefficientSets * numFeatures), isTransposed = true) - val interceptVec = if ($(fitIntercept) || !isMultinomial) { - Vectors.zeros(numCoefficientSets) - } else { - Vectors.sparse(numCoefficientSets, Seq.empty) - } - // separate intercepts and coefficients from the combined matrix - allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => - val isIntercept = $(fitIntercept) && (featureIndex == numFeatures) - if (!isIntercept && featuresStd(featureIndex) != 0.0) { - denseCoefficientMatrix.update(classIndex, featureIndex, - value / featuresStd(featureIndex)) - } - if (isIntercept) interceptVec.toArray(classIndex) = value - } + // center the intercepts when using multinomial algorithm + if ($(fitIntercept) && isMultinomial && !usingBoundConstrainedOptimization) { + val interceptArray = interceptVec.toArray + val interceptMean = interceptArray.sum / interceptArray.length + (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean } + } - if ($(regParam) == 0.0 && isMultinomial && !usingBoundConstrainedOptimization) { - /* - When no regularization is applied, the multinomial coefficients lack identifiability - because we do not use a pivot class. We can add any constant value to the coefficients - and get the same likelihood. So here, we choose the mean centered coefficients for - reproducibility. This method follows the approach in glmnet, described here: - - Friedman, et al. "Regularization Paths for Generalized Linear Models via - Coordinate Descent," https://core.ac.uk/download/files/153/6287975.pdf - */ - val centers = Array.ofDim[Double](numFeatures) - denseCoefficientMatrix.foreachActive { case (i, j, v) => - centers(j) += v - } - centers.transform(_ / numCoefficientSets) - denseCoefficientMatrix.foreachActive { case (i, j, v) => - denseCoefficientMatrix.update(i, j, v - centers(j)) - } - } + return createModel(dataset, numClasses, denseCoefficientMatrix.compressed, + interceptVec.compressed, objectiveHistory) + } - // center the intercepts when using multinomial algorithm - if ($(fitIntercept) && isMultinomial && !usingBoundConstrainedOptimization) { - val interceptArray = interceptVec.toArray - val interceptMean = interceptArray.sum / interceptArray.length - (0 until interceptVec.size).foreach { i => interceptArray(i) -= interceptMean } - } - (denseCoefficientMatrix.compressed, interceptVec.compressed, arrayBuilder.result()) - } + private def checkMultinomial(numClasses: Int): Boolean = { + $(family).toLowerCase(Locale.ROOT) match { + case "binomial" => + require(numClasses == 1 || numClasses == 2, s"Binomial family only supports 1 or 2 " + + s"outcome classes but found $numClasses.") + false + case "multinomial" => true + case "auto" => numClasses > 2 + case other => throw new IllegalArgumentException(s"Unsupported family: $other") } + } - if (handlePersistence) instances.unpersist() - + private def createModel( + dataset: Dataset[_], + numClasses: Int, + coefficientMatrix: Matrix, + interceptVector: Vector, + objectiveHistory: Array[Double]): LogisticRegressionModel = { val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, - numClasses, isMultinomial)) + numClasses, checkMultinomial(numClasses))) val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel() val logRegSummary = if (numClasses <= 2) { @@ -893,6 +750,285 @@ class LogisticRegression @Since("1.2.0") ( model.setSummary(Some(logRegSummary)) } + private def createBounds( + numClasses: Int, + numFeatures: Int, + featuresStd: Array[Double]): (Array[Double], Array[Double]) = { + val isMultinomial = checkMultinomial(numClasses) + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures + val numCoefficientSets = if (isMultinomial) numClasses else 1 + val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets + if (usingBoundConstrainedOptimization) { + val lowerBounds = Array.fill[Double](numCoeffsPlusIntercepts)(Double.NegativeInfinity) + val upperBounds = Array.fill[Double](numCoeffsPlusIntercepts)(Double.PositiveInfinity) + val isSetLowerBoundsOnCoefficients = isSet(lowerBoundsOnCoefficients) + val isSetUpperBoundsOnCoefficients = isSet(upperBoundsOnCoefficients) + val isSetLowerBoundsOnIntercepts = isSet(lowerBoundsOnIntercepts) + val isSetUpperBoundsOnIntercepts = isSet(upperBoundsOnIntercepts) + + var i = 0 + while (i < numCoeffsPlusIntercepts) { + val coefficientSetIndex = i % numCoefficientSets + val featureIndex = i / numCoefficientSets + if (featureIndex < numFeatures) { + if (isSetLowerBoundsOnCoefficients) { + lowerBounds(i) = $(lowerBoundsOnCoefficients)( + coefficientSetIndex, featureIndex) * featuresStd(featureIndex) + } + if (isSetUpperBoundsOnCoefficients) { + upperBounds(i) = $(upperBoundsOnCoefficients)( + coefficientSetIndex, featureIndex) * featuresStd(featureIndex) + } + } else { + if (isSetLowerBoundsOnIntercepts) { + lowerBounds(i) = $(lowerBoundsOnIntercepts)(coefficientSetIndex) + } + if (isSetUpperBoundsOnIntercepts) { + upperBounds(i) = $(upperBoundsOnIntercepts)(coefficientSetIndex) + } + } + i += 1 + } + (lowerBounds, upperBounds) + } else { + (null, null) + } + } + + private def createOptimizer( + numClasses: Int, + numFeatures: Int, + featuresStd: Array[Double], + lowerBounds: Array[Double], + upperBounds: Array[Double]): FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]] = { + val isMultinomial = checkMultinomial(numClasses) + val regParamL1 = $(elasticNetParam) * $(regParam) + val numCoefficientSets = if (isMultinomial) numClasses else 1 + if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) { + if (lowerBounds != null && upperBounds != null) { + new BreezeLBFGSB( + BDV[Double](lowerBounds), BDV[Double](upperBounds), $(maxIter), 10, $(tol)) + } else { + new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + } + } else { + val standardizationParam = $(standardization) + def regParamL1Fun = (index: Int) => { + // Remove the L1 penalization on the intercept + val isIntercept = $(fitIntercept) && index >= numFeatures * numCoefficientSets + if (isIntercept) { + 0.0 + } else if (standardizationParam) { + regParamL1 + } else { + val featureIndex = index / numCoefficientSets + // If `standardization` is false, we still standardize the data + // to improve the rate of convergence; as a result, we have to + // perform this reverse standardization by penalizing each component + // differently to get effectively the same objective function when + // the training dataset is not standardized. + if (featuresStd(featureIndex) != 0.0) { + regParamL1 / featuresStd(featureIndex) + } else 0.0 + } + } + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + } + } + + /** + * The coefficients are laid out in column major order during training. Here we initialize + * a column major matrix of initial coefficients. + */ + private def createInitCoefWithInterceptMatrix( + numClasses: Int, + numFeatures: Int, + histogram: Array[Double], + featuresStd: Array[Double], + lowerBounds: Array[Double], + upperBounds: Array[Double], + instr: Instrumentation): DenseMatrix = { + val isMultinomial = checkMultinomial(numClasses) + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures + val numCoefficientSets = if (isMultinomial) numClasses else 1 + val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets + val initialCoefWithInterceptMatrix = + DenseMatrix.zeros(numCoefficientSets, numFeaturesPlusIntercept) + + val initialModelIsValid = optInitialModel match { + case Some(_initialModel) => + val providedCoefs = _initialModel.coefficientMatrix + val modelIsValid = (providedCoefs.numRows == numCoefficientSets) && + (providedCoefs.numCols == numFeatures) && + (_initialModel.interceptVector.size == numCoefficientSets) && + (_initialModel.getFitIntercept == $(fitIntercept)) + if (!modelIsValid) { + instr.logWarning(s"Initial coefficients will be ignored! Its dimensions " + + s"(${providedCoefs.numRows}, ${providedCoefs.numCols}) did not match the " + + s"expected size ($numCoefficientSets, $numFeatures)") + } + modelIsValid + case None => false + } + + if (initialModelIsValid) { + val providedCoef = optInitialModel.get.coefficientMatrix + providedCoef.foreachActive { (classIndex, featureIndex, value) => + // We need to scale the coefficients since they will be trained in the scaled space + initialCoefWithInterceptMatrix.update(classIndex, featureIndex, + value * featuresStd(featureIndex)) + } + if ($(fitIntercept)) { + optInitialModel.get.interceptVector.foreachNonZero { (classIndex, value) => + initialCoefWithInterceptMatrix.update(classIndex, numFeatures, value) + } + } + } else if ($(fitIntercept) && isMultinomial) { + /* + For multinomial logistic regression, when we initialize the coefficients as zeros, + it will converge faster if we initialize the intercepts such that + it follows the distribution of the labels. + {{{ + P(1) = \exp(b_1) / Z + ... + P(K) = \exp(b_K) / Z + where Z = \sum_{k=1}^{K} \exp(b_k) + }}} + Since this doesn't have a unique solution, one of the solutions that satisfies the + above equations is + {{{ + \exp(b_k) = count_k * \exp(\lambda) + b_k = \log(count_k) * \lambda + }}} + \lambda is a free parameter, so choose the phase \lambda such that the + mean is centered. This yields + {{{ + b_k = \log(count_k) + b_k' = b_k - \mean(b_k) + }}} + */ + val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x)) + val rawMean = rawIntercepts.sum / rawIntercepts.length + rawIntercepts.indices.foreach { i => + initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) + } + } else if ($(fitIntercept)) { + /* + For binary logistic regression, when we initialize the coefficients as zeros, + it will converge faster if we initialize the intercept such that + it follows the distribution of the labels. + + {{{ + P(0) = 1 / (1 + \exp(b)), and + P(1) = \exp(b) / (1 + \exp(b)) + }}}, hence + {{{ + b = \log{P(1) / P(0)} = \log{count_1 / count_0} + }}} + */ + initialCoefWithInterceptMatrix.update(0, numFeatures, + math.log(histogram(1) / histogram(0))) + } + + if (usingBoundConstrainedOptimization) { + // Make sure all initial values locate in the corresponding bound. + var i = 0 + while (i < numCoeffsPlusIntercepts) { + val coefficientSetIndex = i % numCoefficientSets + val featureIndex = i / numCoefficientSets + if (initialCoefWithInterceptMatrix(coefficientSetIndex, featureIndex) < lowerBounds(i)) + { + initialCoefWithInterceptMatrix.update( + coefficientSetIndex, featureIndex, lowerBounds(i)) + } else if ( + initialCoefWithInterceptMatrix(coefficientSetIndex, featureIndex) > upperBounds(i)) + { + initialCoefWithInterceptMatrix.update( + coefficientSetIndex, featureIndex, upperBounds(i)) + } + i += 1 + } + } + + initialCoefWithInterceptMatrix + } + + private def trainOnRows( + instances: RDD[Instance], + featuresStd: Array[Double], + numClasses: Int, + initialCoefWithInterceptMatrix: Matrix, + regularization: Option[L2Regularization], + optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = { + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), + checkMultinomial(numClasses))(_) + + val costFun = new RDDLossFunction(instances, getAggregatorFunc, + regularization, $(aggregationDepth)) + val states = optimizer.iterations(new CachedDiffFunction(costFun), + new BDV[Double](initialCoefWithInterceptMatrix.toArray)) + + /* + Note that in Logistic Regression, the objective history (loss + regularization) + is log-likelihood which is invariant under feature standardization. As a result, + the objective history from optimizer is the same as the one in the original space. + */ + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + bcFeaturesStd.destroy() + + (if (state == null) null else state.x.toArray, arrayBuilder.result) + } + + private def trainOnBlocks( + instances: RDD[Instance], + featuresStd: Array[Double], + numClasses: Int, + initialCoefWithInterceptMatrix: Matrix, + regularization: Option[L2Regularization], + optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = { + val numFeatures = featuresStd.length + val bcFeaturesStd = instances.context.broadcast(featuresStd) + + val standardized = instances.mapPartitions { iter => + val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 } + val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) + iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } + } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept), + checkMultinomial(numClasses))(_) + + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, + regularization, $(aggregationDepth)) + val states = optimizer.iterations(new CachedDiffFunction(costFun), + new BDV[Double](initialCoefWithInterceptMatrix.toArray)) + + /* + Note that in Logistic Regression, the objective history (loss + regularization) + is log-likelihood which is invariant under feature standardization. As a result, + the objective history from optimizer is the same as the one in the original space. + */ + val arrayBuilder = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + arrayBuilder += state.adjustedValue + } + blocks.unpersist() + bcFeaturesStd.destroy() + + (if (state == null) null else state.x.toArray, arrayBuilder.result) + } + @Since("1.4.0") override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index f2b3566f8f09..05a7999cffb3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -18,9 +18,9 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance -import org.apache.spark.ml.linalg.{DenseVector, Vector} -import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.impl.Utils +import org.apache.spark.ml.linalg._ /** * LogisticAggregator computes the gradient and loss for binary or multinomial logistic (softmax) @@ -203,7 +203,7 @@ private[ml] class LogisticAggregator( s"in {1, 2} but found $numClasses.") } - @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { + @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + s"got type ${bcCoefficients.value.getClass}.)") @@ -247,9 +247,9 @@ private[ml] class LogisticAggregator( if (label > 0) { // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * MLUtils.log1pExp(margin) + lossSum += weight * Utils.log1pExp(margin) } else { - lossSum += weight * (MLUtils.log1pExp(margin) - margin) + lossSum += weight * (Utils.log1pExp(margin) - margin) } } @@ -364,3 +364,239 @@ private[ml] class LogisticAggregator( } } } + + +/** + * BlockLogisticAggregator computes the gradient and loss used in Logistic classification + * for blocks in sparse or dense matrix in an online fashion. + * + * Two BlockLogisticAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * NOTE: The feature values are expected to be standardized before computation. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + */ +private[ml] class BlockLogisticAggregator( + numFeatures: Int, + numClasses: Int, + fitIntercept: Boolean, + multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, BlockLogisticAggregator] with Logging { + + if (multinomial && numClasses <= 2) { + logInfo(s"Multinomial logistic regression for binary classification yields separate " + + s"coefficients for positive and negative classes. When no regularization is applied, the" + + s"result will be effectively the same as binary logistic regression. When regularization" + + s"is applied, multinomial loss will produce a result different from binary loss.") + } + + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + private val coefficientSize = bcCoefficients.value.size + protected override val dim: Int = coefficientSize + + if (multinomial) { + require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + + s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") + } else { + require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + + s"coefficients but got $coefficientSize") + require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + + s"in {1, 2} but found $numClasses.") + } + + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + + s"got type ${bcCoefficients.value.getClass}.)") + } + + @transient private lazy val binaryLinear = (multinomial, fitIntercept) match { + case (false, true) => Vectors.dense(coefficientsArray.take(numFeatures)).toDense + case (false, false) => Vectors.dense(coefficientsArray).toDense + case _ => null + } + + @transient private lazy val multinomialLinear = (multinomial, fitIntercept) match { + case (true, true) => + Matrices.dense(numClasses, numFeatures, + coefficientsArray.take(numClasses * numFeatures)).toDense + case (true, false) => + Matrices.dense(numClasses, numFeatures, coefficientsArray).toDense + case _ => null + } + + /** + * Add a new training instance block to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The instance block of data point to be added. + * @return This LogisticAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(block.matrix.isTransposed) + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + + if (multinomial) { + multinomialUpdateInPlace(block) + } else { + binaryUpdateInPlace(block) + } + + this + } + + /** Update gradient and loss using binary loss function. */ + private def binaryUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + + // vec here represents margins or negative dotProducts + val vec = if (fitIntercept) { + Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense + } else { + Vectors.zeros(size).toDense + } + BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec) + + // in-place convert margins to multiplier + // then, vec represents multiplier + var i = 0 + var interceptGradSum = 0.0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + val margin = vec.values(i) + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += weight * Utils.log1pExp(margin) + } else { + lossSum += weight * (Utils.log1pExp(margin) - margin) + } + val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) + vec.values(i) = multiplier + if (fitIntercept) interceptGradSum += multiplier + } else { vec.values(i) = 0.0 } + i += 1 + } + + // predictions are all correct, no gradient signal + if (vec.values.forall(_ == 0)) return + + block.matrix match { + case dm: DenseMatrix => + BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, + vec.values, 1, 1.0, gradientSumArray, 1) + if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum + + case sm: SparseMatrix if fitIntercept => + val linearGradSumVec = Vectors.zeros(numFeatures).toDense + BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) + BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, + gradientSumArray, 1) + gradientSumArray(numFeatures) += interceptGradSum + + case sm: SparseMatrix if !fitIntercept => + val gradSumVec = new DenseVector(gradientSumArray) + BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + } + } + + /** Update gradient and loss using multinomial (softmax) loss function. */ + private def multinomialUpdateInPlace(block: InstanceBlock): Unit = { + val size = block.size + + // mat here represents margins, shape: S X C + val mat = DenseMatrix.zeros(size, numClasses) + if (fitIntercept) { + val offset = numClasses * numFeatures + var j = 0 + while (j < numClasses) { + val intercept = coefficientsArray(offset + j) + var i = 0 + while (i < size) { mat.update(i, j, intercept); i += 1 } + j += 1 + } + } + BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat) + + // in-place convert margins to multipliers + // then, mat represents multipliers + var i = 0 + val tmp = Array.ofDim[Double](numClasses) + val interceptGradSumArr = if (fitIntercept) Array.ofDim[Double](numClasses) else null + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + val label = block.getLabel(i) + + var maxMargin = Double.NegativeInfinity + var j = 0 + while (j < numClasses) { + tmp(j) = mat(i, j) + maxMargin = math.max(maxMargin, tmp(j)) + j += 1 + } + + // marginOfLabel is margins(label) in the formula + val marginOfLabel = tmp(label.toInt) + + var sum = 0.0 + j = 0 + while (j < numClasses) { + if (maxMargin > 0) tmp(j) -= maxMargin + val exp = math.exp(tmp(j)) + sum += exp + tmp(j) = exp + j += 1 + } + + j = 0 + while (j < numClasses) { + val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0)) + mat.update(i, j, multiplier) + if (fitIntercept) interceptGradSumArr(j) += multiplier + j += 1 + } + + if (maxMargin > 0) { + lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) + } else { + lossSum += weight * (math.log(sum) - marginOfLabel) + } + } else { + var j = 0; while (j < numClasses) { mat.update(i, j, 0.0); j += 1 } + } + i += 1 + } + + // mat (multipliers): S X C, dense N + // mat.transpose (multipliers): C X S, dense T + // block.matrix: S X F, unknown type T + // gradSumMat(gradientSumArray): C X FPI (numFeaturesPlusIntercept), dense N + block.matrix match { + case dm: DenseMatrix => + BLAS.nativeBLAS.dgemm("T", "T", numClasses, numFeatures, size, 1.0, + mat.values, size, dm.values, numFeatures, 1.0, gradientSumArray, numClasses) + + case sm: SparseMatrix => + // linearGradSumMat = matrix.T X mat + val linearGradSumMat = DenseMatrix.zeros(numFeatures, numClasses) + BLAS.gemm(1.0, sm.transpose, mat, 0.0, linearGradSumMat) + linearGradSumMat.foreachActive { (i, j, v) => gradientSumArray(i * numClasses + j) += v } + } + + if (fitIntercept) { + BLAS.getBLAS(numClasses).daxpy(numClasses, 1.0, interceptGradSumArr, 0, 1, + gradientSumArray, numClasses * numFeatures, 1) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 6f3ceb0dded2..933a63b40fcf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -522,6 +522,35 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { testProbClassificationModelSingleProbPrediction(mlorModel, smallMultinomialDataset) } + test("LogisticRegression on blocks") { + for (dataset <- Seq(smallBinaryDataset, smallMultinomialDataset, binaryDataset, + multinomialDataset, multinomialDatasetWithZeroVar); fitIntercept <- Seq(true, false)) { + val mlor = new LogisticRegression() + .setFitIntercept(fitIntercept) + .setMaxIter(5) + .setFamily("multinomial") + val model = mlor.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = mlor.setBlockSize(blockSize).fit(dataset) + assert(model.interceptVector ~== model2.interceptVector relTol 1e-6) + assert(model.coefficientMatrix ~== model2.coefficientMatrix relTol 1e-6) + } + } + + for (dataset <- Seq(smallBinaryDataset, binaryDataset); fitIntercept <- Seq(true, false)) { + val blor = new LogisticRegression() + .setFitIntercept(fitIntercept) + .setMaxIter(5) + .setFamily("binomial") + val model = blor.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = blor.setBlockSize(blockSize).fit(dataset) + assert(model.intercept ~== model2.intercept relTol 1e-6) + assert(model.coefficients ~== model2.coefficients relTol 1e-6) + } + } + } + test("coefficients and intercept methods") { val mlr = new LogisticRegression().setMaxIter(1).setFamily("multinomial") val mlrModel = mlr.fit(smallMultinomialDataset) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 51a1edd315e5..e2b417882403 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -180,8 +180,8 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)), fitIntercept = true) blocks.foreach(blockAgg.add) - assert(loss ~== blockAgg.loss relTol 1e-9) - assert(gradient ~== blockAgg.gradient relTol 1e-9) + assert(agg.loss ~== blockAgg.loss relTol 1e-9) + assert(agg.gradient ~== blockAgg.gradient relTol 1e-9) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 607823c500cb..a8fdcc38d13b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.stat.Summarizer import org.apache.spark.ml.util.TestingUtils._ @@ -28,6 +28,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { @transient var instances: Array[Instance] = _ @transient var instancesConstantFeature: Array[Instance] = _ @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + @transient var standardizedInstances: Array[Instance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -46,6 +47,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) ) + standardizedInstances = standardize(instances) } /** Get summary statistics for some data and create a new LogisticAggregator. */ @@ -63,6 +65,35 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) } + /** Get summary statistics for some data and create a new BlockHingeAggregator. */ + private def getNewBlockAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean, + multinomial: Boolean): BlockLogisticAggregator = { + val (_, ySummarizer) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val numFeatures = instances.head.features.size + val numClasses = ySummarizer.histogram.length + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new BlockLogisticAggregator(numFeatures, numClasses, fitIntercept, multinomial)(bcCoefficients) + } + + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val stdArray = featuresSummarizer.std.toArray + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } + } + test("aggregator add method input size") { val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) @@ -187,6 +218,24 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = standardizedInstances + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val blockAgg = getNewBlockAggregator(standardizedInstances, + Vectors.dense(coefArray ++ interceptArray), true, true) + blocks.foreach(blockAgg.add) + assert(agg.loss ~== blockAgg.loss relTol 1e-9) + assert(agg.gradient ~== blockAgg.gradient relTol 1e-9) + } + } } test("check correctness binomial") { @@ -232,6 +281,24 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = standardize(binaryInstances) + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val blockAgg = getNewBlockAggregator(binaryInstances, + Vectors.dense(coefArray ++ Array(intercept)), true, false) + blocks.foreach(blockAgg.add) + assert(agg.loss ~== blockAgg.loss relTol 1e-9) + assert(agg.gradient ~== blockAgg.gradient relTol 1e-9) + } + } } test("check with zero standard deviation") { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 318ae7ac5998..d635be1d8db8 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -464,7 +464,7 @@ def intercept(self): class _LogisticRegressionParams(_ProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold): + HasThreshold, HasBlockSize): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -652,6 +652,8 @@ class LogisticRegression(_JavaProbabilisticClassifier, _LogisticRegressionParams LogisticRegressionModel... >>> blorModel.getProbabilityCol() 'newProbability' + >>> blorModel.getBlockSize() + 1 >>> blorModel.setThreshold(0.1) LogisticRegressionModel... >>> blorModel.getThreshold() @@ -714,7 +716,8 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, + blockSize=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -723,13 +726,15 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, \ + blockSize=1): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LogisticRegression", self.uid) - self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") + self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", + blockSize=1) kwargs = self._input_kwargs self.setParams(**kwargs) self._checkThresholdConsistency() @@ -742,7 +747,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, + blockSize=1): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -750,7 +756,8 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \ aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ - lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None): + lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, \ + blockSize=1): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -845,6 +852,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.1.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LogisticRegressionModel(_JavaProbabilisticClassificationModel, _LogisticRegressionParams, JavaMLWritable, JavaMLReadable, HasTrainingSummary): From 0577bb2c4c94b61ec9e09ab40b8dceca4e02584c Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 6 May 2020 12:29:53 +0800 Subject: [PATCH 2/4] fix doc --- .../src/main/scala/org/apache/spark/ml/impl/Utils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala index 80738be41e0e..305471951853 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/impl/Utils.scala @@ -81,8 +81,9 @@ private[spark] object Utils { /** * When `x` is positive and large, computing `math.log(1 + math.exp(x))` will lead to arithmetic - * overflow. This will happen when `x > 709.78` which is not a very large number. - * It can be addressed by rewriting the formula into `x + math.log1p(math.exp(-x))` when `x > 0`. + * overflow. This will happen when `x > 709.78` which is not a very large number. + * It can be addressed by rewriting the formula into `x + math.log1p(math.exp(-x))` + * when `x` is positive. * @param x a floating-point value as input. * @return the result of `math.log(1 + math.exp(x))`. */ From 1aca7c5dddd554eafb040df4a78fcafc06fdc4fb Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 6 May 2020 18:27:10 +0800 Subject: [PATCH 3/4] nit --- .../ml/optim/aggregator/HingeAggregator.scala | 16 +++++++--------- .../optim/aggregator/LogisticAggregator.scala | 18 ++++++++---------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 1525bb9bfcbb..b1990f7c60f6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -132,15 +132,15 @@ private[ml] class BlockHingeAggregator( @transient private lazy val linear = { val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray - Vectors.dense(linear).toDense + Vectors.dense(linear) } /** - * Add a new training instance block to this HingeAggregator, and update the loss and gradient - * of the objective function. + * Add a new training instance block to this BlockHingeAggregator, and update the loss and + * gradient of the objective function. * * @param block The InstanceBlock to be added. - * @return This HingeAggregator object. + * @return This BlockHingeAggregator object. */ def add(block: InstanceBlock): this.type = { require(block.matrix.isTransposed) @@ -163,7 +163,6 @@ private[ml] class BlockHingeAggregator( // in-place convert dotProducts to gradient scales // then, vec represents gradient scales var i = 0 - var interceptGradSum = 0.0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { @@ -172,12 +171,11 @@ private[ml] class BlockHingeAggregator( // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = label + label - 1.0 - val loss = (1.0 - labelScaled * vec.values(i)) * weight + val loss = (1.0 - labelScaled * vec(i)) * weight if (loss > 0) { lossSum += loss val gradScale = -labelScaled * weight vec.values(i) = gradScale - if (fitIntercept) interceptGradSum += gradScale } else { vec.values(i) = 0.0 } } else { vec.values(i) = 0.0 } i += 1 @@ -190,20 +188,20 @@ private[ml] class BlockHingeAggregator( case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, vec.values, 1, 1.0, gradientSumArray, 1) - if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if fitIntercept => val linearGradSumVec = Vectors.zeros(numFeatures).toDense BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) - gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) } + if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum + this } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 05a7999cffb3..0bd08bb5cb8f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -370,7 +370,7 @@ private[ml] class LogisticAggregator( * BlockLogisticAggregator computes the gradient and loss used in Logistic classification * for blocks in sparse or dense matrix in an online fashion. * - * Two BlockLogisticAggregator can be merged together to have a summary of loss and gradient of + * Two BlockLogisticAggregators can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * * NOTE: The feature values are expected to be standardized before computation. @@ -413,8 +413,8 @@ private[ml] class BlockLogisticAggregator( } @transient private lazy val binaryLinear = (multinomial, fitIntercept) match { - case (false, true) => Vectors.dense(coefficientsArray.take(numFeatures)).toDense - case (false, false) => Vectors.dense(coefficientsArray).toDense + case (false, true) => Vectors.dense(coefficientsArray.take(numFeatures)) + case (false, false) => Vectors.dense(coefficientsArray) case _ => null } @@ -428,11 +428,11 @@ private[ml] class BlockLogisticAggregator( } /** - * Add a new training instance block to this LogisticAggregator, and update the loss and gradient - * of the objective function. + * Add a new training instance block to this BlockLogisticAggregator, and update the loss and + * gradient of the objective function. * * @param block The instance block of data point to be added. - * @return This LogisticAggregator object. + * @return This BlockLogisticAggregator object. */ def add(block: InstanceBlock): this.type = { require(block.matrix.isTransposed) @@ -467,7 +467,6 @@ private[ml] class BlockLogisticAggregator( // in-place convert margins to multiplier // then, vec represents multiplier var i = 0 - var interceptGradSum = 0.0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { @@ -482,7 +481,6 @@ private[ml] class BlockLogisticAggregator( } val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) vec.values(i) = multiplier - if (fitIntercept) interceptGradSum += multiplier } else { vec.values(i) = 0.0 } i += 1 } @@ -494,19 +492,19 @@ private[ml] class BlockLogisticAggregator( case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, vec.values, 1, 1.0, gradientSumArray, 1) - if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if fitIntercept => val linearGradSumVec = Vectors.zeros(numFeatures).toDense BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) - gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) } + + if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum } /** Update gradient and loss using multinomial (softmax) loss function. */ From 8f6582a36632763084fbe2a1c2f525d074eef611 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 6 May 2020 18:31:48 +0800 Subject: [PATCH 4/4] nit --- .../apache/spark/ml/optim/aggregator/LogisticAggregator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 0bd08bb5cb8f..a331122776b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -472,7 +472,7 @@ private[ml] class BlockLogisticAggregator( if (weight > 0) { weightSum += weight val label = block.getLabel(i) - val margin = vec.values(i) + val margin = vec(i) if (label > 0) { // The following is equivalent to log(1 + exp(margin)) but more numerically stable. lossSum += weight * Utils.log1pExp(margin)