From f8782a4e30769f415be58818d2abe21b3f7419e1 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 28 Oct 2020 16:15:30 +0800 Subject: [PATCH 1/4] aft_lir_lor rename param add comment --- .../classification/LogisticRegression.scala | 110 ++++---------- .../ml/optim/aggregator/AFTAggregator.scala | 17 +-- .../ml/regression/AFTSurvivalRegression.scala | 126 ++++++---------- .../ml/regression/LinearRegression.scala | 134 +++++------------- .../classification/LogisticRegression.scala | 4 +- .../LogisticRegressionSuite.scala | 8 +- .../AFTSurvivalRegressionSuite.scala | 4 +- .../ml/regression/LinearRegressionSuite.scala | 4 +- python/pyspark/ml/classification.py | 22 +-- python/pyspark/ml/classification.pyi | 8 +- python/pyspark/ml/regression.py | 44 +++--- python/pyspark/ml/regression.pyi | 18 +-- 12 files changed, 170 insertions(+), 329 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index a43ad466a7c8..db97e24f9259 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -50,7 +50,7 @@ import org.apache.spark.util.VersionUtils private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth - with HasBlockSize { + with HasMaxBlockSizeInMB { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -245,7 +245,7 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas setDefault(regParam -> 0.0, elasticNetParam -> 0.0, maxIter -> 100, tol -> 1E-6, fitIntercept -> true, family -> "auto", standardization -> true, threshold -> 0.5, - aggregationDepth -> 2, blockSize -> 1) + aggregationDepth -> 2, maxBlockSizeInMB -> 0.0) protected def usingBoundConstrainedOptimization: Boolean = { isSet(lowerBoundsOnCoefficients) || isSet(upperBoundsOnCoefficients) || @@ -426,22 +426,13 @@ class LogisticRegression @Since("1.2.0") ( def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) /** - * Set block size for stacking input data in matrices. - * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines - * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * Recommended size is between 10 and 1000. An appropriate choice of the block size depends - * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, - * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). - * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, stacking may bring no performance gain, the worse is possible - * performance regression. - * Default is 1. + * Sets the value of param [[maxBlockSizeInMB]]. + * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) + def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value) private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, @@ -495,31 +486,24 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - train(dataset, handlePersistence) - } - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr => + dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, probabilityCol, regParam, elasticNetParam, standardization, threshold, thresholds, maxIter, - tol, fitIntercept, blockSize) + tol, fitIntercept, maxBlockSizeInMB) + + if (dataset.storageLevel != StorageLevel.NONE) { + instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + + s"then cached during training. Be careful of double caching!") + } val instances = extractInstances(dataset) .setName("training instances") - if (handlePersistence && $(blockSize) == 1) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } - - var requestedMetrics = Seq("mean", "std", "count") - if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros" val (summarizer, labelSummarizer) = Summarizer - .getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics) + .getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count")) val numFeatures = summarizer.mean.size val histogram = labelSummarizer.histogram @@ -547,14 +531,13 @@ class LogisticRegression @Since("1.2.0") ( instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) - if ($(blockSize) > 1) { - val scale = 1.0 / summarizer.count / numFeatures - val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { - instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } + + var actualBlockSizeInMB = $(maxBlockSizeInMB) + if (actualBlockSizeInMB == 0) { + // TODO: for Multinomial logistic regression, take numClasses into account + actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB + require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } val isMultinomial = checkMultinomial(numClasses) @@ -584,7 +567,6 @@ class LogisticRegression @Since("1.2.0") ( } else { Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity) } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() return createModel(dataset, numClasses, coefMatrix, interceptVec, Array(0.0)) } @@ -636,14 +618,9 @@ class LogisticRegression @Since("1.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val (allCoefficients, objectiveHistory) = if ($(blockSize) == 1) { - trainOnRows(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix, - regularization, optimizer) - } else { - trainOnBlocks(instances, featuresStd, numClasses, initialCoefWithInterceptMatrix, - regularization, optimizer) - } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() + val (allCoefficients, objectiveHistory) = + trainImpl(instances, actualBlockSizeInMB, featuresStd, numClasses, + initialCoefWithInterceptMatrix, regularization, optimizer) if (allCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -949,40 +926,9 @@ class LogisticRegression @Since("1.2.0") ( initialCoefWithInterceptMatrix } - private def trainOnRows( - instances: RDD[Instance], - featuresStd: Array[Double], - numClasses: Int, - initialCoefWithInterceptMatrix: Matrix, - regularization: Option[L2Regularization], - optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = { - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), - checkMultinomial(numClasses))(_) - - val costFun = new RDDLossFunction(instances, getAggregatorFunc, - regularization, $(aggregationDepth)) - val states = optimizer.iterations(new CachedDiffFunction(costFun), - new BDV[Double](initialCoefWithInterceptMatrix.toArray)) - - /* - Note that in Logistic Regression, the objective history (loss + regularization) - is log-likelihood which is invariant under feature standardization. As a result, - the objective history from optimizer is the same as the one in the original space. - */ - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - bcFeaturesStd.destroy() - - (if (state == null) null else state.x.toArray, arrayBuilder.result) - } - - private def trainOnBlocks( + private def trainImpl( instances: RDD[Instance], + actualBlockSizeInMB: Double, featuresStd: Array[Double], numClasses: Int, initialCoefWithInterceptMatrix: Matrix, @@ -996,9 +942,11 @@ class LogisticRegression @Since("1.2.0") ( val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val getAggregatorFunc = new BlockLogisticAggregator(numFeatures, numClasses, $(fitIntercept), checkMultinomial(numClasses))(_) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala index 8a5d7fe34e7a..db2f91af6b54 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.regression.AFTPoint /** * AFTAggregator computes the gradient and loss for a AFT loss function, @@ -108,7 +108,7 @@ import org.apache.spark.ml.regression.AFTPoint private[ml] class AFTAggregator( bcFeaturesStd: Broadcast[Array[Double]], fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[AFTPoint, AFTAggregator] { + extends DifferentiableLossAggregator[Instance, AFTAggregator] { protected override val dim: Int = bcCoefficients.value.size @@ -116,10 +116,10 @@ private[ml] class AFTAggregator( * Add a new training data to this AFTAggregator, and update the loss and gradient * of the objective function. * - * @param data The AFTPoint representation for one data point to be added into this aggregator. + * @param data The Instance representation for one data point to be added into this aggregator. * @return This AFTAggregator object. */ - def add(data: AFTPoint): this.type = { + def add(data: Instance): this.type = { val coefficients = bcCoefficients.value.toArray val intercept = coefficients(dim - 2) // sigma is the scale parameter of the AFT model @@ -127,7 +127,7 @@ private[ml] class AFTAggregator( val xi = data.features val ti = data.label - val delta = data.censor + val delta = data.weight require(ti > 0.0, "The lifetime or label should be greater than 0.") @@ -176,7 +176,7 @@ private[ml] class AFTAggregator( */ private[ml] class BlockAFTAggregator( fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) - extends DifferentiableLossAggregator[(Matrix, Array[Double], Array[Double]), + extends DifferentiableLossAggregator[InstanceBlock, BlockAFTAggregator] { protected override val dim: Int = bcCoefficients.value.size @@ -196,8 +196,9 @@ private[ml] class BlockAFTAggregator( * * @return This BlockAFTAggregator object. */ - def add(block: (Matrix, Array[Double], Array[Double])): this.type = { - val (matrix, labels, censors) = block + def add(block: InstanceBlock): this.type = { + // here use Instance.weight to store censor for convenience + val (matrix, labels, censors) = (block.matrix, block.labels, block.weightIter.toArray) require(matrix.isTransposed) require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " + s"instance. Expecting $numFeatures but got ${matrix.numCols}.") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 3870a71a91a2..fbd9fa24b2f3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams -import org.apache.spark.ml.feature.StandardScalerModel +import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator._ import org.apache.spark.ml.optim.loss.RDDLossFunction @@ -47,8 +47,8 @@ import org.apache.spark.storage.StorageLevel * Params for accelerated failure time (AFT) regression. */ private[regression] trait AFTSurvivalRegressionParams extends PredictorParams - with HasMaxIter with HasTol with HasFitIntercept with HasAggregationDepth with HasBlockSize - with Logging { + with HasMaxIter with HasTol with HasFitIntercept with HasAggregationDepth + with HasMaxBlockSizeInMB with Logging { /** * Param for censor column name. @@ -92,7 +92,8 @@ private[regression] trait AFTSurvivalRegressionParams extends PredictorParams setDefault(censorCol -> "censor", quantileProbabilities -> Array(0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99), - fitIntercept -> true, maxIter -> 100, tol -> 1E-6, aggregationDepth -> 2, blockSize -> 1) + fitIntercept -> true, maxIter -> 100, tol -> 1E-6, aggregationDepth -> 2, + maxBlockSizeInMB -> 0.0) /** Checks whether the input has quantiles column name. */ private[regression] def hasQuantilesCol: Boolean = { @@ -184,55 +185,39 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) /** - * Set block size for stacking input data in matrices. - * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines - * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * Recommended size is between 10 and 1000. An appropriate choice of the block size depends - * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, - * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). - * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, stacking may bring no performance gain, the worse is possible - * performance regression. - * Default is 1. + * Sets the value of param [[maxBlockSizeInMB]]. + * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) - - /** - * Extract [[featuresCol]], [[labelCol]] and [[censorCol]] from input dataset, - * and put it in an RDD with strong types. - */ - protected[ml] def extractAFTPoints(dataset: Dataset[_]): RDD[AFTPoint] = { - dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), - col($(censorCol)).cast(DoubleType)).rdd.map { - case Row(features: Vector, label: Double, censor: Double) => - AFTPoint(features, label, censor) - } - } + def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value) override protected def train( dataset: Dataset[_]): AFTSurvivalRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, featuresCol, censorCol, predictionCol, quantilesCol, - fitIntercept, maxIter, tol, aggregationDepth, blockSize) + fitIntercept, maxIter, tol, aggregationDepth, maxBlockSizeInMB) instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length) - val instances = extractAFTPoints(dataset) - .setName("training instances") - - if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) { - instances.persist(StorageLevel.MEMORY_AND_DISK) + if (dataset.storageLevel != StorageLevel.NONE) { + instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + + s"then cached during training. Be careful of double caching!") } - var requestedMetrics = Seq("mean", "std", "count") - if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros" + val instances = dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), + col($(censorCol)).cast(DoubleType)) + .rdd.map { case Row(features: Vector, label: Double, censor: Double) => + require(censor == 1.0 || censor == 0.0, "censor must be 1.0 or 0.0") + // AFT does not support instance weighting, + // here use Instance.weight to store censor for convenience + Instance(label, censor, features) + }.setName("training instances") + val summarizer = instances.treeAggregate( - Summarizer.createSummarizerBuffer(requestedMetrics: _*))( - seqOp = (c: SummarizerBuffer, v: AFTPoint) => c.add(v.features), + Summarizer.createSummarizerBuffer("mean", "std", "count"))( + seqOp = (c: SummarizerBuffer, i: Instance) => c.add(i.features), combOp = (c1: SummarizerBuffer, c2: SummarizerBuffer) => c1.merge(c2), depth = $(aggregationDepth) ) @@ -241,14 +226,12 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S val numFeatures = featuresStd.length instr.logNumFeatures(numFeatures) instr.logNumExamples(summarizer.count) - if ($(blockSize) > 1) { - val scale = 1.0 / summarizer.count / numFeatures - val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { - instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } + + var actualBlockSizeInMB = $(maxBlockSizeInMB) + if (actualBlockSizeInMB == 0) { + actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB + require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } if (!$(fitIntercept) && (0 until numFeatures).exists { i => @@ -268,12 +251,8 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S */ val initialParameters = Vectors.zeros(numFeatures + 2) - val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { - trainOnRows(instances, featuresStd, optimizer, initialParameters) - } else { - trainOnBlocks(instances, featuresStd, optimizer, initialParameters) - } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() + val (rawCoefficients, objectiveHistory) = + trainImpl(instances, actualBlockSizeInMB, featuresStd, optimizer, initialParameters) if (rawCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -290,47 +269,24 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale) } - private def trainOnRows( - instances: RDD[AFTPoint], + private def trainImpl( + instances: RDD[Instance], + actualBlockSizeInMB: Double, featuresStd: Array[Double], optimizer: BreezeLBFGS[BDV[Double]], initialParameters: Vector): (Array[Double], Array[Double]) = { val bcFeaturesStd = instances.context.broadcast(featuresStd) - val getAggregatorFunc = new AFTAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, None, $(aggregationDepth)) - - val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialParameters.asBreeze.toDenseVector) - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - bcFeaturesStd.destroy() - - (if (state != null) state.x.toArray else null, arrayBuilder.result) - } - - private def trainOnBlocks( - instances: RDD[AFTPoint], - featuresStd: Array[Double], - optimizer: BreezeLBFGS[BDV[Double]], - initialParameters: Vector): (Array[Double], Array[Double]) = { - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val blocks = instances.mapPartitions { iter => + val standardized = instances.mapPartitions { iter => val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 } val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) - iter.grouped($(blockSize)).map { seq => - val matrix = Matrices.fromVectors(seq.map(point => func(point.features))) - val labels = seq.map(_.label).toArray - val censors = seq.map(_.censor).toArray - (matrix, labels, censors) - } + iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - blocks.persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val getAggregatorFunc = new BlockAFTAggregator($(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, None, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 235a7f9b6ebd..04d185fb443a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver - with HasAggregationDepth with HasLoss with HasBlockSize { + with HasAggregationDepth with HasLoss with HasMaxBlockSizeInMB { import LinearRegression._ @@ -107,7 +107,7 @@ private[regression] trait LinearRegressionParams extends PredictorParams setDefault(regParam -> 0.0, fitIntercept -> true, standardization -> true, elasticNetParam -> 0.0, maxIter -> 100, tol -> 1E-6, solver -> Auto, - aggregationDepth -> 2, loss -> SquaredError, epsilon -> 1.35, blockSize -> 1) + aggregationDepth -> 2, loss -> SquaredError, epsilon -> 1.35, maxBlockSizeInMB -> 0.0) override protected def validateAndTransformSchema( schema: StructType, @@ -312,29 +312,26 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setEpsilon(value: Double): this.type = set(epsilon, value) /** - * Set block size for stacking input data in matrices. - * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines - * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * Recommended size is between 10 and 1000. An appropriate choice of the block size depends - * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, - * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). - * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, stacking may bring no performance gain, the worse is possible - * performance regression. - * Default is 1. + * Sets the value of param [[maxBlockSizeInMB]]. + * Default is 0.0. * * @group expertSetParam */ @Since("3.1.0") - def setBlockSize(value: Int): this.type = set(blockSize, value) + def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value) - override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr => + override protected def train( + dataset: Dataset[_]): LinearRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, solver, tol, elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss, - epsilon, blockSize) + epsilon, maxBlockSizeInMB) + + if (dataset.storageLevel != StorageLevel.NONE) { + instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " + + s"then cached during training. Be careful of double caching!") + } // Extract the number of features before deciding optimization solver. val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol)) @@ -348,35 +345,26 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val instances = extractInstances(dataset) .setName("training instances") - if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } + val (summarizer, labelSummarizer) = Summarizer + .getRegressionSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count")) - var requestedMetrics = Seq("mean", "std", "count") - if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros" - val (featuresSummarizer, ySummarizer) = Summarizer - .getRegressionSummarizers(instances, $(aggregationDepth), requestedMetrics) + val yMean = labelSummarizer.mean(0) + val rawYStd = labelSummarizer.std(0) - val yMean = ySummarizer.mean(0) - val rawYStd = ySummarizer.std(0) - - instr.logNumExamples(ySummarizer.count) + instr.logNumExamples(labelSummarizer.count) instr.logNamedValue(Instrumentation.loggerTags.meanOfLabels, yMean) instr.logNamedValue(Instrumentation.loggerTags.varianceOfLabels, rawYStd) - instr.logSumOfWeights(featuresSummarizer.weightSum) - if ($(blockSize) > 1) { - val scale = 1.0 / featuresSummarizer.count / numFeatures - val sparsity = 1 - featuresSummarizer.numNonzeros.toArray.map(_ * scale).sum - instr.logNamedValue("sparsity", sparsity.toString) - if (sparsity > 0.5) { - instr.logWarning(s"sparsity of input dataset is $sparsity, " + - s"which may hurt performance in high-level BLAS.") - } + instr.logSumOfWeights(summarizer.weightSum) + + var actualBlockSizeInMB = $(maxBlockSizeInMB) + if (actualBlockSizeInMB == 0) { + actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB + require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0") + instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString) } if (rawYStd == 0.0) { if ($(fitIntercept) || yMean == 0.0) { - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() return trainWithConstantLabel(dataset, instr, numFeatures, yMean) } else { require($(regParam) == 0.0, "The standard deviation of the label is zero. " + @@ -389,8 +377,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String // if y is constant (rawYStd is zero), then y cannot be scaled. In this case // setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm. val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean) - val featuresMean = featuresSummarizer.mean.toArray - val featuresStd = featuresSummarizer.std.toArray + val featuresMean = summarizer.mean.toArray + val featuresStd = summarizer.std.toArray if (!$(fitIntercept) && (0 until numFeatures).exists { i => featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) { @@ -426,14 +414,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String Vectors.dense(Array.fill(dim)(1.0)) } - val (parameters, objectiveHistory) = if ($(blockSize) == 1) { - trainOnRows(instances, yMean, yStd, featuresMean, featuresStd, - initialValues, regularization, optimizer) - } else { - trainOnBlocks(instances, yMean, yStd, featuresMean, featuresStd, - initialValues, regularization, optimizer) - } - if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() + val (parameters, objectiveHistory) = + trainImpl(instances, actualBlockSizeInMB, yMean, yStd, + featuresMean, featuresStd, initialValues, regularization, optimizer) if (parameters == null) { val msg = s"${optimizer.getClass.getName} failed." @@ -541,56 +524,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String } } - private def trainOnRows( - instances: RDD[Instance], - yMean: Double, - yStd: Double, - featuresMean: Array[Double], - featuresStd: Array[Double], - initialValues: Vector, - regularization: Option[L2Regularization], - optimizer: FirstOrderMinimizer[BDV[Double], DiffFunction[BDV[Double]]]) = { - val bcFeaturesMean = instances.context.broadcast(featuresMean) - val bcFeaturesStd = instances.context.broadcast(featuresStd) - - val costFun = $(loss) match { - case SquaredError => - val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), - bcFeaturesStd, bcFeaturesMean)(_) - new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) - case Huber => - val getAggregatorFunc = new HuberAggregator($(fitIntercept), $(epsilon), bcFeaturesStd)(_) - new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) - } - - val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialValues.asBreeze.toDenseVector) - - /* - Note that in Linear Regression, the objective history (loss + regularization) returned - from optimizer is computed in the scaled space given by the following formula. -
- $$ - L &= 1/2n||\sum_i w_i(x_i - \bar{x_i}) / \hat{x_i} - (y - \bar{y}) / \hat{y}||^2 - + regTerms \\ - $$ -
- */ - val arrayBuilder = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - arrayBuilder += state.adjustedValue - } - - bcFeaturesMean.destroy() - bcFeaturesStd.destroy() - - (if (state == null) null else state.x.toArray, arrayBuilder.result) - } - - private def trainOnBlocks( + private def trainImpl( instances: RDD[Instance], + actualBlockSizeInMB: Double, yMean: Double, yStd: Double, featuresMean: Array[Double], @@ -606,9 +542,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } - val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + + val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong + val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage) .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training blocks (blockSize=${$(blockSize)})") + .setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)") val costFun = $(loss) match { case SquaredError => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 21eb17dfaacb..f88f3fce61b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -339,10 +339,8 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 51a6ae3c7e49..d0b282db1ece 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -593,8 +593,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { .setMaxIter(5) .setFamily("multinomial") val model = mlor.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = mlor.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = mlor.setMaxBlockSizeInMB(s).fit(dataset) assert(model.interceptVector ~== model2.interceptVector relTol 1e-6) assert(model.coefficientMatrix ~== model2.coefficientMatrix relTol 1e-6) } @@ -606,8 +606,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { .setMaxIter(5) .setFamily("binomial") val model = blor.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = blor.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = blor.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-6) assert(model.coefficients ~== model2.coefficients relTol 1e-6) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index 63ccfa383462..e745e7f67df9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -436,8 +436,8 @@ class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest { .setQuantileProbabilities(quantileProbabilities) .setQuantilesCol("quantiles") val model = aft.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = aft.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = aft.setMaxBlockSizeInMB(s).fit(dataset) assert(model.coefficients ~== model2.coefficients relTol 1e-9) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.scale ~== model2.scale relTol 1e-9) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index fb70883bffc5..b3098be0a36f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -672,8 +672,8 @@ class LinearRegressionSuite extends MLTest with DefaultReadWriteTest with PMMLRe .setLoss(loss) .setMaxIter(3) val model = lir.fit(dataset) - Seq(4, 16, 64).foreach { blockSize => - val model2 = lir.setBlockSize(blockSize).fit(dataset) + Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s => + val model2 = lir.setMaxBlockSizeInMB(s).fit(dataset) assert(model.intercept ~== model2.intercept relTol 1e-9) assert(model.coefficients ~== model2.coefficients relTol 1e-9) assert(model.scale ~== model2.scale relTol 1e-9) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 8f13f3275cb5..50882fc895d6 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -783,7 +783,7 @@ class LinearSVCTrainingSummary(LinearSVCSummary, _TrainingSummary): class _LogisticRegressionParams(_ProbabilisticClassifierParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol, HasStandardization, HasWeightCol, HasAggregationDepth, - HasThreshold, HasBlockSize): + HasThreshold, HasMaxBlockSizeInMB): """ Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`. @@ -836,7 +836,7 @@ class _LogisticRegressionParams(_ProbabilisticClassifierParams, HasRegParam, def __init__(self, *args): super(_LogisticRegressionParams, self).__init__(*args) self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto", - blockSize=1) + maxBlockSizeInMB=0.0) @since("1.4.0") def setThreshold(self, value): @@ -980,8 +980,8 @@ class LogisticRegression(_JavaProbabilisticClassifier, _LogisticRegressionParams LogisticRegressionModel... >>> blorModel.getProbabilityCol() 'newProbability' - >>> blorModel.getBlockSize() - 1 + >>> blorModel.getMaxBlockSizeInMB() + 0.0 >>> blorModel.setThreshold(0.1) LogisticRegressionModel... >>> blorModel.getThreshold() @@ -1047,7 +1047,7 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, - blockSize=1): + maxBlockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ @@ -1057,7 +1057,7 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, \ - blockSize=1): + maxBlockSizeInMB=0.0): If the threshold and thresholds Params are both set, they must be equivalent. """ super(LogisticRegression, self).__init__() @@ -1076,7 +1076,7 @@ def setParams(self, *, featuresCol="features", labelCol="label", predictionCol=" aggregationDepth=2, family="auto", lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, - blockSize=1): + maxBlockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ @@ -1085,7 +1085,7 @@ def setParams(self, *, featuresCol="features", labelCol="label", predictionCol=" aggregationDepth=2, family="auto", \ lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \ lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, \ - blockSize=1): + maxBlockSizeInMB=0.0): Sets params for logistic regression. If the threshold and thresholds Params are both set, they must be equivalent. """ @@ -1181,11 +1181,11 @@ def setAggregationDepth(self, value): return self._set(aggregationDepth=value) @since("3.1.0") - def setBlockSize(self, value): + def setMaxBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSize`. + Sets the value of :py:attr:`maxBlockSizeInMB`. """ - return self._set(blockSize=value) + return self._set(maxBlockSizeInMB=value) class LogisticRegressionModel(_JavaProbabilisticClassificationModel, _LogisticRegressionParams, diff --git a/python/pyspark/ml/classification.pyi b/python/pyspark/ml/classification.pyi index 9f72d24f6311..4bde851bb1e0 100644 --- a/python/pyspark/ml/classification.pyi +++ b/python/pyspark/ml/classification.pyi @@ -257,7 +257,7 @@ class _LogisticRegressionParams( HasWeightCol, HasAggregationDepth, HasThreshold, - HasBlockSize, + HasMaxBlockSizeInMB, ): threshold: Param[float] family: Param[str] @@ -305,7 +305,7 @@ class LogisticRegression( upperBoundsOnCoefficients: Optional[Matrix] = ..., lowerBoundsOnIntercepts: Optional[Vector] = ..., upperBoundsOnIntercepts: Optional[Vector] = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -330,7 +330,7 @@ class LogisticRegression( upperBoundsOnCoefficients: Optional[Matrix] = ..., lowerBoundsOnIntercepts: Optional[Vector] = ..., upperBoundsOnIntercepts: Optional[Vector] = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> LogisticRegression: ... def setFamily(self, value: str) -> LogisticRegression: ... def setLowerBoundsOnCoefficients(self, value: Matrix) -> LogisticRegression: ... @@ -345,7 +345,7 @@ class LogisticRegression( def setStandardization(self, value: bool) -> LogisticRegression: ... def setWeightCol(self, value: str) -> LogisticRegression: ... def setAggregationDepth(self, value: int) -> LogisticRegression: ... - def setBlockSize(self, value: int) -> LogisticRegression: ... + def setMaxBlockSizeInMB(self, value: float) -> LogisticRegression: ... class LogisticRegressionModel( _JavaProbabilisticClassificationModel[Vector], diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index d1a5852fd65b..734c00cd1a92 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -24,7 +24,7 @@ from pyspark.ml.base import _PredictorParams from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol, HasWeightCol, \ Param, Params, TypeConverters, HasMaxIter, HasTol, HasFitIntercept, HasAggregationDepth, \ - HasBlockSize, HasRegParam, HasSolver, HasStepSize, HasSeed, HasElasticNetParam, \ + HasMaxBlockSizeInMB, HasRegParam, HasSolver, HasStepSize, HasSeed, HasElasticNetParam, \ HasStandardization, HasLoss, HasVarianceCol from pyspark.ml.tree import _DecisionTreeModel, _DecisionTreeParams, \ _TreeEnsembleModel, _RandomForestParams, _GBTParams, _TreeRegressorParams @@ -87,7 +87,7 @@ class _JavaRegressionModel(RegressionModel, JavaPredictionModel, metaclass=ABCMe class _LinearRegressionParams(_PredictorParams, HasRegParam, HasElasticNetParam, HasMaxIter, HasTol, HasFitIntercept, HasStandardization, HasWeightCol, HasSolver, - HasAggregationDepth, HasLoss, HasBlockSize): + HasAggregationDepth, HasLoss, HasMaxBlockSizeInMB): """ Params for :py:class:`LinearRegression` and :py:class:`LinearRegressionModel`. @@ -166,8 +166,8 @@ class LinearRegression(_JavaRegressor, _LinearRegressionParams, JavaMLWritable, LinearRegressionModel... >>> model.getMaxIter() 5 - >>> model.getBlockSize() - 1 + >>> model.getMaxBlockSizeInMB() + 0.0 >>> test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], ["features"]) >>> abs(model.predict(test0.head().features) - (-1.0)) < 0.001 True @@ -207,12 +207,12 @@ class LinearRegression(_JavaRegressor, _LinearRegressionParams, JavaMLWritable, def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35, blockSize=1): + loss="squaredError", epsilon=1.35, maxBlockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35, blockSize=1) + loss="squaredError", epsilon=1.35, maxBlockSizeInMB=0.0) """ super(LinearRegression, self).__init__() self._java_obj = self._new_java_obj( @@ -225,12 +225,12 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, standardization=True, solver="auto", weightCol=None, aggregationDepth=2, - loss="squaredError", epsilon=1.35, blockSize=1): + loss="squaredError", epsilon=1.35, maxBlockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \ standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \ - loss="squaredError", epsilon=1.35, blockSize=1) + loss="squaredError", epsilon=1.35, maxBlockSizeInMB=0.0) Sets params for linear regression. """ kwargs = self._input_kwargs @@ -307,11 +307,11 @@ def setLoss(self, value): return self._set(lossType=value) @since("3.1.0") - def setBlockSize(self, value): + def setMaxBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSize`. + Sets the value of :py:attr:`maxBlockSizeInMB`. """ - return self._set(blockSize=value) + return self._set(maxBlockSizeInMB=value) class LinearRegressionModel(_JavaRegressionModel, _LinearRegressionParams, GeneralJavaMLWritable, @@ -1683,7 +1683,7 @@ def evaluateEachIteration(self, dataset, loss): class _AFTSurvivalRegressionParams(_PredictorParams, HasMaxIter, HasTol, HasFitIntercept, - HasAggregationDepth, HasBlockSize): + HasAggregationDepth, HasMaxBlockSizeInMB): """ Params for :py:class:`AFTSurvivalRegression` and :py:class:`AFTSurvivalRegressionModel`. @@ -1710,7 +1710,7 @@ def __init__(self, *args): super(_AFTSurvivalRegressionParams, self).__init__(*args) self._setDefault(censorCol="censor", quantileProbabilities=[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], - maxIter=100, tol=1E-6, blockSize=1) + maxIter=100, tol=1E-6, maxBlockSizeInMB=0.0) @since("1.6.0") def getCensorCol(self): @@ -1762,8 +1762,8 @@ class AFTSurvivalRegression(_JavaRegressor, _AFTSurvivalRegressionParams, 10 >>> aftsr.clear(aftsr.maxIter) >>> model = aftsr.fit(df) - >>> model.getBlockSize() - 1 + >>> model.getMaxBlockSizeInMB() + 0.0 >>> model.setFeaturesCol("features") AFTSurvivalRegressionModel... >>> model.predict(Vectors.dense(6.3)) @@ -1802,12 +1802,12 @@ class AFTSurvivalRegression(_JavaRegressor, _AFTSurvivalRegressionParams, def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", fitIntercept=True, maxIter=100, tol=1E-6, censorCol="censor", quantileProbabilities=list([0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]), - quantilesCol=None, aggregationDepth=2, blockSize=1): + quantilesCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0): """ __init__(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ fitIntercept=True, maxIter=100, tol=1E-6, censorCol="censor", \ quantileProbabilities=[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], \ - quantilesCol=None, aggregationDepth=2, blockSize=1) + quantilesCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0) """ super(AFTSurvivalRegression, self).__init__() self._java_obj = self._new_java_obj( @@ -1820,12 +1820,12 @@ def __init__(self, *, featuresCol="features", labelCol="label", predictionCol="p def setParams(self, *, featuresCol="features", labelCol="label", predictionCol="prediction", fitIntercept=True, maxIter=100, tol=1E-6, censorCol="censor", quantileProbabilities=list([0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99]), - quantilesCol=None, aggregationDepth=2, blockSize=1): + quantilesCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0): """ setParams(self, \\*, featuresCol="features", labelCol="label", predictionCol="prediction", \ fitIntercept=True, maxIter=100, tol=1E-6, censorCol="censor", \ quantileProbabilities=[0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], \ - quantilesCol=None, aggregationDepth=2, blockSize=1): + quantilesCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0): """ kwargs = self._input_kwargs return self._set(**kwargs) @@ -1883,11 +1883,11 @@ def setAggregationDepth(self, value): return self._set(aggregationDepth=value) @since("3.1.0") - def setBlockSize(self, value): + def setMaxBlockSizeInMB(self, value): """ - Sets the value of :py:attr:`blockSize`. + Sets the value of :py:attr:`maxBlockSizeInMB`. """ - return self._set(blockSize=value) + return self._set(maxBlockSizeInMB=value) class AFTSurvivalRegressionModel(_JavaRegressionModel, _AFTSurvivalRegressionParams, diff --git a/python/pyspark/ml/regression.pyi b/python/pyspark/ml/regression.pyi index 991eb4f12ac8..5cb0e7a5092f 100644 --- a/python/pyspark/ml/regression.pyi +++ b/python/pyspark/ml/regression.pyi @@ -24,7 +24,7 @@ from pyspark.ml import PredictionModel, Predictor from pyspark.ml.base import _PredictorParams from pyspark.ml.param.shared import ( HasAggregationDepth, - HasBlockSize, + HasMaxBlockSizeInMB, HasElasticNetParam, HasFeaturesCol, HasFitIntercept, @@ -86,7 +86,7 @@ class _LinearRegressionParams( HasSolver, HasAggregationDepth, HasLoss, - HasBlockSize, + HasMaxBlockSizeInMB, ): solver: Param[str] loss: Param[str] @@ -116,7 +116,7 @@ class LinearRegression( weightCol: Optional[str] = ..., aggregationDepth: int = ..., epsilon: float = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -134,7 +134,7 @@ class LinearRegression( weightCol: Optional[str] = ..., aggregationDepth: int = ..., epsilon: float = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> LinearRegression: ... def setEpsilon(self, value: float) -> LinearRegression: ... def setMaxIter(self, value: int) -> LinearRegression: ... @@ -147,7 +147,7 @@ class LinearRegression( def setSolver(self, value: str) -> LinearRegression: ... def setAggregationDepth(self, value: int) -> LinearRegression: ... def setLoss(self, value: str) -> LinearRegression: ... - def setBlockSize(self, value: int) -> LinearRegression: ... + def setMaxBlockSizeInMB(self, value: float) -> LinearRegression: ... class LinearRegressionModel( _JavaRegressionModel[Vector], @@ -522,7 +522,7 @@ class _AFTSurvivalRegressionParams( HasTol, HasFitIntercept, HasAggregationDepth, - HasBlockSize, + HasMaxBlockSizeInMB, ): censorCol: Param[str] quantileProbabilities: Param[List[float]] @@ -551,7 +551,7 @@ class AFTSurvivalRegression( quantileProbabilities: List[float] = ..., quantilesCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> None: ... def setParams( self, @@ -566,7 +566,7 @@ class AFTSurvivalRegression( quantileProbabilities: List[float] = ..., quantilesCol: Optional[str] = ..., aggregationDepth: int = ..., - blockSize: int = ... + maxBlockSizeInMB: float = ... ) -> AFTSurvivalRegression: ... def setCensorCol(self, value: str) -> AFTSurvivalRegression: ... def setQuantileProbabilities(self, value: List[float]) -> AFTSurvivalRegression: ... @@ -575,7 +575,7 @@ class AFTSurvivalRegression( def setTol(self, value: float) -> AFTSurvivalRegression: ... def setFitIntercept(self, value: bool) -> AFTSurvivalRegression: ... def setAggregationDepth(self, value: int) -> AFTSurvivalRegression: ... - def setBlockSize(self, value: int) -> AFTSurvivalRegression: ... + def setMaxBlockSizeInMB(self, value: float) -> AFTSurvivalRegression: ... class AFTSurvivalRegressionModel( _JavaRegressionModel[Vector], From 5f9e70f5ef4584ca29cf42233ea82fe8870864dd Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Fri, 13 Nov 2020 09:37:16 +0800 Subject: [PATCH 2/4] fix py lir --- python/pyspark/ml/regression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 734c00cd1a92..5ce484d964a5 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -107,7 +107,7 @@ class _LinearRegressionParams(_PredictorParams, HasRegParam, HasElasticNetParam, def __init__(self, *args): super(_LinearRegressionParams, self).__init__(*args) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35, - blockSize=1) + maxBlockSizeInMB=0.0) @since("2.3.0") def getEpsilon(self): From 48b28146033b292f6b856d2443bf5b562854066a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 18 Nov 2020 13:10:36 +0800 Subject: [PATCH 3/4] update doc --- .../org/apache/spark/ml/classification/LinearSVC.scala | 6 +++++- .../apache/spark/ml/classification/LogisticRegression.scala | 6 +++++- .../apache/spark/ml/param/shared/SharedParamsCodeGen.scala | 4 ++-- .../org/apache/spark/ml/param/shared/sharedParams.scala | 4 ++-- .../apache/spark/ml/regression/AFTSurvivalRegression.scala | 6 +++++- .../org/apache/spark/ml/regression/LinearRegression.scala | 6 +++++- python/pyspark/ml/param/_shared_params_code_gen.py | 4 ++-- python/pyspark/ml/param/shared.py | 4 ++-- 8 files changed, 28 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 95f37671e139..9191b3ec4bc2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -67,6 +67,10 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR * This binary classifier optimizes the Hinge Loss using the OWLQN optimizer. * Only supports L2 regularization currently. * + * Since 3.1.0, it supports stacking instances into blocks and using GEMV for + * better performance. + * The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default. + * */ @Since("2.2.0") class LinearSVC @Since("2.2.0") ( @@ -154,7 +158,7 @@ class LinearSVC @Since("2.2.0") ( /** * Sets the value of param [[maxBlockSizeInMB]]. - * Default is 0.0. + * Default is 0.0, then 1.0 MB will be chosen. * * @group expertSetParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index db97e24f9259..057196dd67a5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -276,6 +276,10 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas * * This class supports fitting traditional logistic regression model by LBFGS/OWLQN and * bound (box) constrained logistic regression model by LBFGSB. + * + * Since 3.1.0, it supports stacking instances into blocks and using GEMV/GEMM for + * better performance. + * The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default. */ @Since("1.2.0") class LogisticRegression @Since("1.2.0") ( @@ -427,7 +431,7 @@ class LogisticRegression @Since("1.2.0") ( /** * Sets the value of param [[maxBlockSizeInMB]]. - * Default is 0.0. + * Default is 0.0, then 1.0 MB will be chosen. * * @group expertSetParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 0640fe355fdd..2f6b9c1e11aa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -111,8 +111,8 @@ private[shared] object SharedParamsCodeGen { isValid = "ParamValidators.gt(0)", isExpertParam = true), ParamDesc[Double]("maxBlockSizeInMB", "Maximum memory in MB for stacking input data " + "into blocks. Data is stacked within partitions. If more than remaining data size in a " + - "partition then it is adjusted to the data size. If 0, try to infer an appropriate " + - "value. Must be >= 0.", + "partition then it is adjusted to the data size. Default 0.0 represents choosing " + + "optimal value, depends on specific algorithm. Must be >= 0.", Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true) ) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 2fbda45a9e97..425bf91fd00b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -570,10 +570,10 @@ trait HasBlockSize extends Params { trait HasMaxBlockSizeInMB extends Params { /** - * Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.. + * Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.. * @group expertParam */ - final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", ParamValidators.gtEq(0.0)) + final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.", ParamValidators.gtEq(0.0)) setDefault(maxBlockSizeInMB, 0.0) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index fbd9fa24b2f3..4d214dc74ed8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -128,6 +128,10 @@ private[regression] trait AFTSurvivalRegressionParams extends PredictorParams * (see * Accelerated failure time model (Wikipedia)) * based on the Weibull distribution of the survival time. + * + * Since 3.1.0, it supports stacking instances into blocks and using GEMV for + * better performance. + * The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default. */ @Since("1.6.0") class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: String) @@ -186,7 +190,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S /** * Sets the value of param [[maxBlockSizeInMB]]. - * Default is 0.0. + * Default is 0.0, then 1.0 MB will be chosen. * * @group expertSetParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 04d185fb443a..11a1984b0ab4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -175,6 +175,10 @@ private[regression] trait LinearRegressionParams extends PredictorParams * $$ * * + * Since 3.1.0, it supports stacking instances into blocks and using GEMV for + * better performance. + * The block size will be 1.0 MB, if param maxBlockSizeInMB is set 0.0 by default. + * * Note: Fitting with huber loss only supports none and L2 regularization. */ @Since("1.3.0") @@ -313,7 +317,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String /** * Sets the value of param [[maxBlockSizeInMB]]. - * Default is 0.0. + * Default is 0.0, then 1.0 MB will be chosen. * * @group expertSetParam */ diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 53d26972c4b4..bcab51f76bd4 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -168,8 +168,8 @@ def get$Name(self): "adjusted to the size of this data.", None, "TypeConverters.toInt"), ("maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is " + "stacked within partitions. If more than remaining data size in a partition then it " + - "is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", - "0.0", "TypeConverters.toFloat")] + "is adjusted to the data size. Default 0.0 represents choosing optimal value, depends " + + "on specific algorithm. Must be >= 0.", "0.0", "TypeConverters.toFloat")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index cbef7386e221..9311e4481e2b 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -601,10 +601,10 @@ def getBlockSize(self): class HasMaxBlockSizeInMB(Params): """ - Mixin for param maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0. + Mixin for param maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. """ - maxBlockSizeInMB = Param(Params._dummy(), "maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", typeConverter=TypeConverters.toFloat) + maxBlockSizeInMB = Param(Params._dummy(), "maxBlockSizeInMB", "maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0.", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasMaxBlockSizeInMB, self).__init__() From 317bde95b3454be06e5a053e7cd31bb554e85af6 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 18 Nov 2020 15:21:47 +0800 Subject: [PATCH 4/4] use local variable --- .../ml/optim/aggregator/AFTAggregator.scala | 29 +++++++++---------- .../ml/optim/aggregator/HingeAggregator.scala | 6 ++-- .../ml/optim/aggregator/HuberAggregator.scala | 11 ++++--- .../aggregator/LeastSquaresAggregator.scala | 11 +++---- .../optim/aggregator/LogisticAggregator.scala | 19 +++++++----- 5 files changed, 43 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala index db2f91af6b54..fd59b4b71c41 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala @@ -197,16 +197,12 @@ private[ml] class BlockAFTAggregator( * @return This BlockAFTAggregator object. */ def add(block: InstanceBlock): this.type = { - // here use Instance.weight to store censor for convenience - val (matrix, labels, censors) = (block.matrix, block.labels, block.weightIter.toArray) - require(matrix.isTransposed) - require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " + - s"instance. Expecting $numFeatures but got ${matrix.numCols}.") - require(labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.") - - val size = matrix.numRows - require(labels.length == size && censors.length == size) + require(block.matrix.isTransposed) + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.") + val size = block.size val intercept = coefficientsArray(dim - 2) // sigma is the scale parameter of the AFT model val sigma = math.exp(coefficientsArray(dim - 1)) @@ -217,26 +213,30 @@ private[ml] class BlockAFTAggregator( } else { Vectors.zeros(size).toDense } - BLAS.gemv(1.0, matrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) // in-place convert margins to gradient scales // then, vec represents gradient scales + var localLossSum = 0.0 var i = 0 var sigmaGradSum = 0.0 while (i < size) { - val ti = labels(i) - val delta = censors(i) + val ti = block.getLabel(i) + // here use Instance.weight to store censor for convenience + val delta = block.getWeight(i) val margin = vec(i) val epsilon = (math.log(ti) - margin) / sigma val expEpsilon = math.exp(epsilon) - lossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon + localLossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon val multiplier = (delta - expEpsilon) / sigma vec.values(i) = multiplier sigmaGradSum += delta + multiplier * sigma * epsilon i += 1 } + lossSum += localLossSum + weightSum += size - matrix match { + block.matrix match { case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, vec.values, 1, 1.0, gradientSumArray, 1) @@ -250,7 +250,6 @@ private[ml] class BlockAFTAggregator( if (fitIntercept) gradientSumArray(dim - 2) += vec.values.sum gradientSumArray(dim - 1) += sigmaGradSum - weightSum += size this } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b1990f7c60f6..3d7251256315 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -162,24 +162,26 @@ private[ml] class BlockHingeAggregator( // in-place convert dotProducts to gradient scales // then, vec represents gradient scales + var localLossSum = 0.0 var i = 0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { - weightSum += weight // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = label + label - 1.0 val loss = (1.0 - labelScaled * vec(i)) * weight if (loss > 0) { - lossSum += loss + localLossSum += loss val gradScale = -labelScaled * weight vec.values(i) = gradScale } else { vec.values(i) = 0.0 } } else { vec.values(i) = 0.0 } i += 1 } + lossSum += localLossSum + weightSum += block.weightIter.sum // predictions are all correct, no gradient signal if (vec.values.forall(_ == 0)) return this diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala index 59ecc038e556..35582dbc990e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala @@ -167,7 +167,6 @@ private[ml] class BlockHuberAggregator( protected override val dim: Int = bcParameters.value.size private val numFeatures = if (fitIntercept) dim - 2 else dim - 1 - private val sigma = bcParameters.value(dim - 1) private val intercept = if (fitIntercept) bcParameters.value(dim - 2) else 0.0 // make transient so we do not serialize between aggregation stages @transient private lazy val linear = Vectors.dense(bcParameters.value.toArray.take(numFeatures)) @@ -187,7 +186,9 @@ private[ml] class BlockHuberAggregator( s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") if (block.weightIter.forall(_ == 0)) return this + val size = block.size + val sigma = bcParameters.value(dim - 1) // vec here represents margins or dotProducts val vec = if (fitIntercept) { @@ -200,23 +201,23 @@ private[ml] class BlockHuberAggregator( // in-place convert margins to multipliers // then, vec represents multipliers var sigmaGradSum = 0.0 + var localLossSum = 0.0 var i = 0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { - weightSum += weight val label = block.getLabel(i) val margin = vec(i) val linearLoss = label - margin if (math.abs(linearLoss) <= sigma * epsilon) { - lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma) + localLossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma) val linearLossDivSigma = linearLoss / sigma val multiplier = -1.0 * weight * linearLossDivSigma vec.values(i) = multiplier sigmaGradSum += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0)) } else { - lossSum += 0.5 * weight * + localLossSum += 0.5 * weight * (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon) val sign = if (linearLoss >= 0) -1.0 else 1.0 val multiplier = weight * sign * epsilon @@ -226,6 +227,8 @@ private[ml] class BlockHuberAggregator( } else { vec.values(i) = 0.0 } i += 1 } + lossSum += localLossSum + weightSum += block.weightIter.sum block.matrix match { case dm: DenseMatrix => diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala index fa3bda00d802..d5e1ea980840 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala @@ -267,9 +267,6 @@ private[ml] class BlockLeastSquaresAggregator( val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0 (Vectors.dense(coefficientsArray), offset) } - // do not use tuple assignment above because it will circumvent the @transient tag - @transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1 - @transient private lazy val offset = effectiveCoefAndOffset._2 /** * Add a new training instance block to this BlockLeastSquaresAggregator, and update the loss @@ -286,7 +283,9 @@ private[ml] class BlockLeastSquaresAggregator( s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") if (block.weightIter.forall(_ == 0)) return this + val size = block.size + val (effectiveCoefficientsVec, offset) = effectiveCoefAndOffset // vec here represents diffs val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd)) @@ -294,16 +293,18 @@ private[ml] class BlockLeastSquaresAggregator( // in-place convert diffs to multipliers // then, vec represents multipliers + var localLossSum = 0.0 var i = 0 while (i < size) { val weight = block.getWeight(i) val diff = vec(i) - lossSum += weight * diff * diff / 2 - weightSum += weight + localLossSum += weight * diff * diff / 2 val multiplier = weight * diff vec.values(i) = multiplier i += 1 } + lossSum += localLossSum + weightSum += block.weightIter.sum val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index a331122776b5..2496c789f8da 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -466,24 +466,26 @@ private[ml] class BlockLogisticAggregator( // in-place convert margins to multiplier // then, vec represents multiplier + var localLossSum = 0.0 var i = 0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { - weightSum += weight val label = block.getLabel(i) val margin = vec(i) if (label > 0) { // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * Utils.log1pExp(margin) + localLossSum += weight * Utils.log1pExp(margin) } else { - lossSum += weight * (Utils.log1pExp(margin) - margin) + localLossSum += weight * (Utils.log1pExp(margin) - margin) } val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) vec.values(i) = multiplier } else { vec.values(i) = 0.0 } i += 1 } + lossSum += localLossSum + weightSum += block.weightIter.sum // predictions are all correct, no gradient signal if (vec.values.forall(_ == 0)) return @@ -514,10 +516,11 @@ private[ml] class BlockLogisticAggregator( // mat here represents margins, shape: S X C val mat = DenseMatrix.zeros(size, numClasses) if (fitIntercept) { + val localCoefficientsArray = coefficientsArray val offset = numClasses * numFeatures var j = 0 while (j < numClasses) { - val intercept = coefficientsArray(offset + j) + val intercept = localCoefficientsArray(offset + j) var i = 0 while (i < size) { mat.update(i, j, intercept); i += 1 } j += 1 @@ -527,13 +530,13 @@ private[ml] class BlockLogisticAggregator( // in-place convert margins to multipliers // then, mat represents multipliers + var localLossSum = 0.0 var i = 0 val tmp = Array.ofDim[Double](numClasses) val interceptGradSumArr = if (fitIntercept) Array.ofDim[Double](numClasses) else null while (i < size) { val weight = block.getWeight(i) if (weight > 0) { - weightSum += weight val label = block.getLabel(i) var maxMargin = Double.NegativeInfinity @@ -566,15 +569,17 @@ private[ml] class BlockLogisticAggregator( } if (maxMargin > 0) { - lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) + localLossSum += weight * (math.log(sum) - marginOfLabel + maxMargin) } else { - lossSum += weight * (math.log(sum) - marginOfLabel) + localLossSum += weight * (math.log(sum) - marginOfLabel) } } else { var j = 0; while (j < numClasses) { mat.update(i, j, 0.0); j += 1 } } i += 1 } + lossSum += localLossSum + weightSum += block.weightIter.sum // mat (multipliers): S X C, dense N // mat.transpose (multipliers): C X S, dense T