From 8ad507b4ba662f71420a4438ddde29f5ed2e89eb Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Apr 2020 11:51:52 +0800 Subject: [PATCH 01/11] init init init init init init init init --- .../spark/serializer/KryoSerializer.scala | 1 + .../org/apache/spark/ml/linalg/Matrices.scala | 41 ++++ .../spark/ml/classification/LinearSVC.scala | 221 ++++++++++++------ .../apache/spark/ml/feature/Instance.scala | 91 +++++++- .../ml/optim/aggregator/HingeAggregator.scala | 128 +++++++++- .../org/apache/spark/ml/stat/Summarizer.scala | 5 +- .../ml/classification/LinearSVCSuite.scala | 24 ++ .../spark/ml/feature/InstanceSuite.scala | 31 +++ python/pyspark/ml/classification.py | 20 +- 9 files changed, 477 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cdaab599e2a0..55ac2c410953 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer { "org.apache.spark.ml.attribute.NumericAttribute", "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.InstanceBlock", "org.apache.spark.ml.feature.LabeledPoint", "org.apache.spark.ml.feature.OffsetInstance", "org.apache.spark.ml.linalg.DenseMatrix", diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala index 34e4366701bb..1254ed747aeb 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala @@ -1008,6 +1008,47 @@ object SparseMatrix { @Since("2.0.0") object Matrices { + private[ml] def fromVectors(vectors: Seq[Vector]): Matrix = { + val numRows = vectors.length + val numCols = vectors.head.size + val denseSize = Matrices.getDenseSize(numCols, numRows) + val nnz = vectors.iterator.map(_.numNonzeros).sum + val sparseSize = Matrices.getSparseSize(nnz, numRows + 1) + if (denseSize < sparseSize) { + val values = Array.ofDim[Double](numRows * numCols) + var offset = 0 + var j = 0 + while (j < numRows) { + vectors(j).foreachNonZero { (i, v) => + values(offset + i) = v + } + offset += numCols + j += 1 + } + new DenseMatrix(numRows, numCols, values, true) + } else { + val colIndices = MArrayBuilder.make[Int] + val values = MArrayBuilder.make[Double] + val rowPtrs = MArrayBuilder.make[Int] + var rowPtr = 0 + rowPtrs += 0 + var j = 0 + while (j < numRows) { + var nnz = 0 + vectors(j).foreachNonZero { (i, v) => + colIndices += i + values += v + nnz += 1 + } + rowPtr += nnz + rowPtrs += rowPtr + j += 1 + } + new SparseMatrix(numRows, numCols, rowPtrs.result(), + colIndices.result(), values.result(), true) + } + } + /** * Creates a column-major dense matrix. * diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index efe84f89d81a..91e2dc2850c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.optim.aggregator._ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.storage.StorageLevel /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold { + with HasAggregationDepth with HasThreshold with HasBlockSize { /** * Param for threshold in binary classification prediction. @@ -154,31 +156,54 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set block size for stacking input data in matrices. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.0.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr => - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - - val instances = extractInstances(dataset) - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, - regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth) + regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) - val (summarizer, labelSummarizer) = + val instances = extractInstances(dataset).setName("training instances") + + val (summarizer, labelSummarizer) = if ($(blockSize) == 1) { + if (dataset.storageLevel == StorageLevel.NONE) { + instances.persist(StorageLevel.MEMORY_AND_DISK) + } Summarizer.getClassificationSummarizers(instances, $(aggregationDepth)) - instr.logNumExamples(summarizer.count) - instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) - instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) - instr.logSumOfWeights(summarizer.weightSum) + } else { + // instances will be standardized and converted to blocks, so no need to cache instances. + Summarizer.getClassificationSummarizers(instances, $(aggregationDepth), + Seq("mean", "std", "count", "numNonZeros")) + } val histogram = labelSummarizer.histogram val numInvalid = labelSummarizer.countInvalid val numFeatures = summarizer.mean.size - val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures + + instr.logNumExamples(summarizer.count) + instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString) + instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) + instr.logSumOfWeights(summarizer.weightSum) + if ($(blockSize) > 1) { + val sparsity = 1 - summarizer.numNonzeros.toArray.sum / numFeatures + instr.logNamedValue("sparsity", sparsity.toString) + if (sparsity > 0.5) { + logWarning(s"sparsity of input dataset is $sparsity, " + + s"which may hurt performance in high-level BLAS.") + } + } val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match { case Some(n: Int) => @@ -192,78 +217,122 @@ class LinearSVC @Since("2.2.0") ( instr.logNumClasses(numClasses) instr.logNumFeatures(numFeatures) - val (coefficientVector, interceptVector, objectiveHistory) = { - if (numInvalid != 0) { - val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + - s"Found $numInvalid invalid labels." - instr.logError(msg) - throw new SparkException(msg) - } - - val featuresStd = summarizer.std.toArray - val getFeaturesStd = (j: Int) => featuresStd(j) - val regParamL2 = $(regParam) - val bcFeaturesStd = instances.context.broadcast(featuresStd) - val regularization = if (regParamL2 != 0.0) { - val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures - Some(new L2Regularization(regParamL2, shouldApply, - if ($(standardization)) None else Some(getFeaturesStd))) - } else { - None - } + if (numInvalid != 0) { + val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " + + s"Found $numInvalid invalid labels." + instr.logError(msg) + throw new SparkException(msg) + } - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, - $(aggregationDepth)) + val featuresStd = summarizer.std.toArray + val getFeaturesStd = (j: Int) => featuresStd(j) + val regularization = if ($(regParam) != 0.0) { + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures + Some(new L2Regularization($(regParam), shouldApply, + if ($(standardization)) None else Some(getFeaturesStd))) + } else None + + def regParamL1Fun = (index: Int) => 0D + val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + + /* + The coefficients are trained in the scaled space; we're converting them back to + the original space. + Note that the intercept in scaled space and original space is the same; + as a result, no scaling is needed. + */ + val state = if ($(blockSize) == 1) { + trainOnRows(instances, featuresStd, regularization, optimizer) + } else { + trainOnBlocks(instances, featuresStd, regularization, optimizer) + } + if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() - def regParamL1Fun = (index: Int) => 0D - val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) - val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) + if (state == null) { + val msg = s"${optimizer.getClass.getName} failed." + instr.logError(msg) + throw new SparkException(msg) + } + val rawCoefficients = state.x.toArray - val states = optimizer.iterations(new CachedDiffFunction(costFun), - initialCoefWithIntercept.asBreeze.toDenseVector) + val coefficientArray = Array.tabulate(numFeatures) { i => + if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0 + } + val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0 + copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept)) + } - val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] - var state: optimizer.State = null - while (states.hasNext) { - state = states.next() - scaledObjectiveHistory += state.adjustedValue - } + private def trainOnRows( + instances: RDD[Instance], + featuresStd: Array[Double], + regularization: Option[L2Regularization], + optimizer: BreezeOWLQN[Int, BDV[Double]]): optimizer.State = { + val numFeatures = featuresStd.length + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures + + val bcFeaturesStd = instances.context.broadcast(featuresStd) + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + val costFun = new RDDLossFunction(instances, getAggregatorFunc, + regularization, $(aggregationDepth)) + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) + + val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + scaledObjectiveHistory += state.adjustedValue + } + bcFeaturesStd.destroy() - bcFeaturesStd.destroy() - if (state == null) { - val msg = s"${optimizer.getClass.getName} failed." - instr.logError(msg) - throw new SparkException(msg) - } + state + } - /* - The coefficients are trained in the scaled space; we're converting them back to - the original space. - Note that the intercept in scaled space and original space is the same; - as a result, no scaling is needed. - */ - val rawCoefficients = state.x.toArray - val coefficientArray = Array.tabulate(numFeatures) { i => - if (featuresStd(i) != 0.0) { - rawCoefficients(i) / featuresStd(i) - } else { - 0.0 + private def trainOnBlocks( + instances: RDD[Instance], + featuresStd: Array[Double], + regularization: Option[L2Regularization], + optimizer: BreezeOWLQN[Int, BDV[Double]]): optimizer.State = { + val numFeatures = featuresStd.length + val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures + + val bcFeaturesStd = instances.context.broadcast(featuresStd) + + val standardized = instances.map { + case Instance(label, weight, features) => + val featuresStd = bcFeaturesStd.value + val array = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = featuresStd(i) + if (std != 0) array(i) = v / std } - } - - val intercept = if ($(fitIntercept)) { - rawCoefficients(numFeaturesPlusIntercept - 1) - } else { - 0.0 - } - (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result()) + Instance(label, weight, Vectors.dense(array)) } + val blocks = InstanceBlock.blokify(standardized, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val getAggregatorFunc = new BlockHingeAggregator(numFeatures, + $(fitIntercept), $(blockSize))(_) + val costFun = new RDDLossFunction(blocks, getAggregatorFunc, + regularization, $(aggregationDepth)) + + val states = optimizer.iterations(new CachedDiffFunction(costFun), + Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) + + val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] + var state: optimizer.State = null + while (states.hasNext) { + state = states.next() + scaledObjectiveHistory += state.adjustedValue + } + blocks.unpersist() + bcFeaturesStd.destroy() - if (handlePersistence) instances.unpersist() - - copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector)) + state } + } @Since("2.2.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 11d0c4689cbb..db5f88d5dddc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -17,7 +17,8 @@ package org.apache.spark.ml.feature -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg._ +import org.apache.spark.rdd.RDD /** * Class that represents an instance of weighted data point with label and features. @@ -28,6 +29,94 @@ import org.apache.spark.ml.linalg.Vector */ private[spark] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Class that represents an block of instance. + * If all weights are 1, then an empty array is stored. + */ +private[spark] case class InstanceBlock( + labels: Array[Double], + weights: Array[Double], + matrix: Matrix) { + require(labels.length == matrix.numRows) + require(matrix.isTransposed) + if (weights.nonEmpty) { + require(labels.length == weights.length) + } + + def size: Int = labels.length + + def numFeatures: Int = matrix.numCols + + def instanceIterator: Iterator[Instance] = { + if (weights.nonEmpty) { + labels.iterator.zip(weights.iterator).zip(matrix.rowIter) + .map { case ((label, weight), vec) => Instance(label, weight, vec) } + } else { + labels.iterator.zip(matrix.rowIter) + .map { case (label, vec) => Instance(label, 1.0, vec) } + } + } + + def getLabel(i: Int): Double = labels(i) + + def labelIter: Iterator[Double] = labels.iterator + + @transient lazy val getWeight: Int => Double = { + if (weights.nonEmpty) { + (i: Int) => weights(i) + } else { + (i: Int) => 1.0 + } + } + + def weightIter: Iterator[Double] = { + if (weights.nonEmpty) { + weights.iterator + } else { + Iterator.fill(size)(1.0) + } + } + + // directly get the non-zero iterator of i-th row vector without array copy or slice + @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = { + matrix match { + case dm: DenseMatrix => + (i: Int) => + val start = numFeatures * i + Iterator.tabulate(numFeatures)(j => + (j, dm.values(start + j)) + ).filter(_._2 != 0) + case sm: SparseMatrix => + (i: Int) => + val start = sm.colPtrs(i) + val end = sm.colPtrs(i + 1) + Iterator.tabulate(end - start)(j => + (sm.rowIndices(start + j), sm.values(start + j)) + ).filter(_._2 != 0) + } + } +} + +private[spark] object InstanceBlock { + + def fromInstances(instances: Seq[Instance]): InstanceBlock = { + val labels = instances.map(_.label).toArray + val weights = if (instances.exists(_.weight != 1)) { + instances.map(_.weight).toArray + } else { + Array.emptyDoubleArray + } + val matrix = Matrices.fromVectors(instances.map(_.features)) + new InstanceBlock(labels, weights, matrix) + } + + def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = { + instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances)) + } +} + + /** * Case class that represents an instance of data point with * label, weight, offset and features. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index b0906f1b0651..392f1040ff96 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg._ /** @@ -103,3 +103,129 @@ private[ml] class HingeAggregator( } } } + + +/** + * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in + * binary classification for instances in sparse or dense vector in an online fashion. + * + * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * NOTE: The feature values are expected to be standardized before computation. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + */ +private[ml] class BlockHingeAggregator( + numFeatures: Int, + fitIntercept: Boolean, + blockSize: Int)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] { + + private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + protected override val dim: Int = numFeaturesPlusIntercept + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + + @transient private lazy val linear = { + if (fitIntercept) { + new DenseVector(coefficientsArray.take(numFeatures)) + } else { + new DenseVector(coefficientsArray) + } + } + + @transient private lazy val intercept = + if (fitIntercept) coefficientsArray(numFeatures) else 0.0 + + @transient private lazy val linearGradSumVec = + if (fitIntercept) new DenseVector(Array.ofDim[Double](numFeatures)) else null + + @transient private lazy val auxiliaryVec = + new DenseVector(Array.ofDim[Double](blockSize)) + + /** + * Add a new training instance block to this HingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param block The InstanceBlock to be added. + * @return This HingeAggregator object. + */ + def add(block: InstanceBlock): this.type = { + require(block.matrix.isTransposed) + require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " + + s"instance. Expecting $numFeatures but got ${block.numFeatures}.") + require(block.weightIter.forall(_ >= 0), + s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0") + + if (block.weightIter.forall(_ == 0)) return this + val size = block.size + val localGradientSumArray = gradientSumArray + + // vec here represents dotProducts + val vec = if (size == blockSize) { + auxiliaryVec + } else { + // the last block within one partition may be of size less than blockSize + new DenseVector(Array.ofDim[Double](size)) + } + + if (fitIntercept) { + var i = 0 + while (i < size) { vec.values(i) = intercept; i += 1 } + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + } else { + BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) + } + + // in-place convert dotProducts to gradient scales + // then, vec represents gradient scales + var i = 0 + while (i < size) { + val weight = block.getWeight(i) + if (weight > 0) { + weightSum += weight + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val label = block.getLabel(i) + val labelScaled = 2 * label - 1.0 + val loss = (1.0 - labelScaled * vec(i)) * weight + if (loss > 0) { + lossSum += loss + val gradScale = -labelScaled * weight + vec.values(i) = gradScale + } else { + vec.values(i) = 0.0 + } + } else { + vec.values(i) = 0.0 + } + i += 1 + } + + // predictions are all correct, no gradient signal + if (vec.values.forall(_ == 0)) return this + + block.matrix match { + case dm: DenseMatrix => + BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, + vec.values, 1, 1.0, localGradientSumArray, 1) + if (fitIntercept) localGradientSumArray(numFeatures) += vec.values.sum + + case sm: SparseMatrix if fitIntercept => + BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) + linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } + localGradientSumArray(numFeatures) += vec.values.sum + + case sm: SparseMatrix if !fitIntercept => + val gradSumVec = new DenseVector(localGradientSumArray) + BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + } + + this + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala index 1183041b86bb..4230b495fa5d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala @@ -225,9 +225,10 @@ object Summarizer extends Logging { /** Get classification feature and label summarizers for provided data. */ private[ml] def getClassificationSummarizers( instances: RDD[Instance], - aggregationDepth: Int = 2): (SummarizerBuffer, MultiClassSummarizer) = { + aggregationDepth: Int = 2, + requested: Seq[String] = Seq("mean", "std", "count")) = { instances.treeAggregate( - (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))( + (Summarizer.createSummarizerBuffer(requested: _*), new MultiClassSummarizer))( seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)), combOp = (c1: (SummarizerBuffer, MultiClassSummarizer), diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index c2072cea1185..6740018e78c8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -207,6 +207,30 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { dataset.as[LabeledPoint], estimator, modelEquals, 42L) } + test("LinearSVC on blocks") { + Seq(smallBinaryDataset, smallSparseBinaryDataset).foreach { dataset => + { + val lsvc = new LinearSVC().setFitIntercept(false).setBlockSize(1).setMaxIter(5) + val model = lsvc.fit(dataset) + Seq(2, 4, 8, 16, 32).foreach { blockSize => + val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + assert(model.intercept ~== model2.intercept relTol 1e-9) + assert(model.coefficients ~== model2.coefficients relTol 1e-9) + } + } + + { + val lsvc = new LinearSVC().setFitIntercept(true).setBlockSize(1).setMaxIter(5) + val model = lsvc.fit(dataset) + Seq(2, 4, 8, 16, 32).foreach { blockSize => + val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + assert(model.intercept ~== model2.intercept relTol 1e-9) + assert(model.coefficients ~== model2.coefficients relTol 1e-9) + } + } + } + } + test("prediction on single instance") { val trainer = new LinearSVC() val model = trainer.fit(smallBinaryDataset) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala index 5a7449005839..d780bdf5f5dc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala @@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{ val o2 = ser.deserialize[OffsetInstance](ser.serialize(o)) assert(o === o2) } + + val block1 = InstanceBlock.fromInstances(Seq(instance1)) + val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2)) + Seq(block1, block2).foreach { o => + val o2 = ser.deserialize[InstanceBlock](ser.serialize(o)) + assert(o.labels === o2.labels) + assert(o.weights === o2.weights) + assert(o.matrix === o2.matrix) + } + } + + test("InstanceBlock: check correctness") { + val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)) + val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse) + val instances = Seq(instance1, instance2) + + val block = InstanceBlock.fromInstances(instances) + assert(block.size === 2) + assert(block.numFeatures === 2) + block.instanceIterator.zipWithIndex.foreach { + case (instance, i) => + assert(instance.label === instances(i).label) + assert(instance.weight === instances(i).weight) + assert(instance.features.toArray === instances(i).features.toArray) + } + Seq(0, 1).foreach { i => + val nzIter = block.getNonZeroIter(i) + val vec = Vectors.sparse(2, nzIter.toSeq) + assert(vec.toArray === instances(i).features.toArray) + } } + } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0d88aa8f17f9..b5a14d7f773e 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -290,6 +290,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl LinearSVCModel... >>> model.getThreshold() 0.5 + >>> model.getBlockSize() + 1 >>> model.coefficients DenseVector([0.0, -0.2792, -0.1833]) >>> model.intercept @@ -328,18 +330,19 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ super(LinearSVC, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.classification.LinearSVC", self.uid) self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True, - standardization=True, threshold=0.0, aggregationDepth=2) + standardization=True, threshold=0.0, aggregationDepth=2, + blockSize=1) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -348,12 +351,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, - aggregationDepth=2): + aggregationDepth=2, blockSize=1): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \ fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \ - aggregationDepth=2): + aggregationDepth=2, blockSize=1): Sets params for Linear SVM Classifier. """ kwargs = self._input_kwargs @@ -418,6 +421,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.0.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable): """ From 21fe97c73308567b5db0bc0cfb7b768bcb21f193 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Apr 2020 17:03:19 +0800 Subject: [PATCH 02/11] update version update version --- .../scala/org/apache/spark/ml/classification/LinearSVC.scala | 5 +++-- python/pyspark/ml/classification.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 91e2dc2850c6..2dce695c4fc6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -162,7 +162,7 @@ class LinearSVC @Since("2.2.0") ( * * @group expertSetParam */ - @Since("3.0.0") + @Since("3.1.0") def setBlockSize(value: Int): this.type = set(blockSize, value) setDefault(blockSize -> 1) @@ -197,7 +197,8 @@ class LinearSVC @Since("2.2.0") ( instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString) instr.logSumOfWeights(summarizer.weightSum) if ($(blockSize) > 1) { - val sparsity = 1 - summarizer.numNonzeros.toArray.sum / numFeatures + val scale = 1.0 / summarizer.count / numFeatures + val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum instr.logNamedValue("sparsity", sparsity.toString) if (sparsity > 0.5) { logWarning(s"sparsity of input dataset is $sparsity, " + diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index b5a14d7f773e..3abd20dc1563 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -421,7 +421,7 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) - @since("3.0.0") + @since("3.1.0") def setBlockSize(self, value): """ Sets the value of :py:attr:`blockSize`. From f969e4ab48260e50708c8bb4000d8dc79bdb1ce8 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Apr 2020 17:51:13 +0800 Subject: [PATCH 03/11] update doc --- .../ml/optim/aggregator/HingeAggregator.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 392f1040ff96..4568cedb1e55 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -107,7 +107,7 @@ private[ml] class HingeAggregator( /** * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in - * binary classification for instances in sparse or dense vector in an online fashion. + * binary classification for blocks in sparse or dense matrix in an online fashion. * * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. @@ -166,24 +166,25 @@ private[ml] class BlockHingeAggregator( val size = block.size val localGradientSumArray = gradientSumArray - // vec here represents dotProducts + // vec/arr here represents dotProducts val vec = if (size == blockSize) { auxiliaryVec } else { // the last block within one partition may be of size less than blockSize new DenseVector(Array.ofDim[Double](size)) } + val arr = vec.values if (fitIntercept) { var i = 0 - while (i < size) { vec.values(i) = intercept; i += 1 } + while (i < size) { arr(i) = intercept; i += 1 } BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) } // in-place convert dotProducts to gradient scales - // then, vec represents gradient scales + // then, vec/arr represents gradient scales var i = 0 while (i < size) { val weight = block.getWeight(i) @@ -193,33 +194,33 @@ private[ml] class BlockHingeAggregator( // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = 2 * label - 1.0 - val loss = (1.0 - labelScaled * vec(i)) * weight + val loss = (1.0 - labelScaled * arr(i)) * weight if (loss > 0) { lossSum += loss val gradScale = -labelScaled * weight - vec.values(i) = gradScale + arr(i) = gradScale } else { - vec.values(i) = 0.0 + arr(i) = 0.0 } } else { - vec.values(i) = 0.0 + arr(i) = 0.0 } i += 1 } // predictions are all correct, no gradient signal - if (vec.values.forall(_ == 0)) return this + if (arr.forall(_ == 0)) return this block.matrix match { case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - vec.values, 1, 1.0, localGradientSumArray, 1) - if (fitIntercept) localGradientSumArray(numFeatures) += vec.values.sum + arr, 1, 1.0, localGradientSumArray, 1) + if (fitIntercept) localGradientSumArray(numFeatures) += arr.sum case sm: SparseMatrix if fitIntercept => BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += vec.values.sum + localGradientSumArray(numFeatures) += arr.sum case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(localGradientSumArray) From 68dbc35c4ab39a9ef2d1175a2b6a03efe2285608 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sun, 26 Apr 2020 19:00:37 +0800 Subject: [PATCH 04/11] fix py --- python/pyspark/ml/classification.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 3abd20dc1563..318ae7ac5998 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -241,7 +241,8 @@ def predictProbability(self, value): class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol, - HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold): + HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold, + HasBlockSize): """ Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`. From 5e6e5e71de9683229bfc9dbed5361fd89920135f Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 27 Apr 2020 17:26:24 +0800 Subject: [PATCH 05/11] nit --- .../ml/optim/aggregator/HingeAggregator.scala | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 4568cedb1e55..8b21ba7bb158 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -39,8 +39,8 @@ private[ml] class HingeAggregator( fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[Instance, HingeAggregator] { - private val numFeatures: Int = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + private val numFeatures = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + @@ -131,22 +131,19 @@ private[ml] class BlockHingeAggregator( s" but got type ${bcCoefficients.value.getClass}.") } - @transient private lazy val linear = { - if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } + @transient private lazy val linear = if (fitIntercept) { + Vectors.dense(coefficientsArray.take(numFeatures)).toDense + } else { + Vectors.dense(coefficientsArray).toDense } @transient private lazy val intercept = if (fitIntercept) coefficientsArray(numFeatures) else 0.0 @transient private lazy val linearGradSumVec = - if (fitIntercept) new DenseVector(Array.ofDim[Double](numFeatures)) else null + if (fitIntercept) Vectors.zeros(numFeatures).toDense else null - @transient private lazy val auxiliaryVec = - new DenseVector(Array.ofDim[Double](blockSize)) + @transient private lazy val auxiliaryVec = Vectors.zeros(blockSize).toDense /** * Add a new training instance block to this HingeAggregator, and update the loss and gradient @@ -167,15 +164,10 @@ private[ml] class BlockHingeAggregator( val localGradientSumArray = gradientSumArray // vec/arr here represents dotProducts - val vec = if (size == blockSize) { - auxiliaryVec - } else { - // the last block within one partition may be of size less than blockSize - new DenseVector(Array.ofDim[Double](size)) - } + val vec = if (size == blockSize) auxiliaryVec else Vectors.zeros(size).toDense val arr = vec.values - if (fitIntercept) { + if (fitIntercept && intercept != 0) { var i = 0 while (i < size) { arr(i) = intercept; i += 1 } BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) @@ -193,7 +185,7 @@ private[ml] class BlockHingeAggregator( // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) - val labelScaled = 2 * label - 1.0 + val labelScaled = label + label - 1.0 val loss = (1.0 - labelScaled * arr(i)) * weight if (loss > 0) { lossSum += loss From a74da0c0dfc8b914dafd630fa60bfdc8913a6235 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 27 Apr 2020 17:43:35 +0800 Subject: [PATCH 06/11] nit nit nit --- .../spark/ml/classification/LinearSVC.scala | 14 ++++++-------- .../ml/optim/aggregator/HingeAggregator.scala | 16 +++++++--------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 2dce695c4fc6..6bc713067571 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -242,19 +242,18 @@ class LinearSVC @Since("2.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val state = if ($(blockSize) == 1) { + val rawCoefficients = if ($(blockSize) == 1) { trainOnRows(instances, featuresStd, regularization, optimizer) } else { trainOnBlocks(instances, featuresStd, regularization, optimizer) } if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() - if (state == null) { + if (rawCoefficients == null) { val msg = s"${optimizer.getClass.getName} failed." instr.logError(msg) throw new SparkException(msg) } - val rawCoefficients = state.x.toArray val coefficientArray = Array.tabulate(numFeatures) { i => if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0 @@ -267,7 +266,7 @@ class LinearSVC @Since("2.2.0") ( instances: RDD[Instance], featuresStd: Array[Double], regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): optimizer.State = { + optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = { val numFeatures = featuresStd.length val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures @@ -287,14 +286,14 @@ class LinearSVC @Since("2.2.0") ( } bcFeaturesStd.destroy() - state + if (state == null) null else state.x.toArray } private def trainOnBlocks( instances: RDD[Instance], featuresStd: Array[Double], regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): optimizer.State = { + optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = { val numFeatures = featuresStd.length val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures @@ -331,9 +330,8 @@ class LinearSVC @Since("2.2.0") ( blocks.unpersist() bcFeaturesStd.destroy() - state + if (state == null) null else state.x.toArray } - } @Since("2.2.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 8b21ba7bb158..c30223c731f2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -138,7 +138,7 @@ private[ml] class BlockHingeAggregator( } @transient private lazy val intercept = - if (fitIntercept) coefficientsArray(numFeatures) else 0.0 + if (fitIntercept) coefficientsArray.last else 0.0 @transient private lazy val linearGradSumVec = if (fitIntercept) Vectors.zeros(numFeatures).toDense else null @@ -161,15 +161,13 @@ private[ml] class BlockHingeAggregator( if (block.weightIter.forall(_ == 0)) return this val size = block.size - val localGradientSumArray = gradientSumArray // vec/arr here represents dotProducts val vec = if (size == blockSize) auxiliaryVec else Vectors.zeros(size).toDense val arr = vec.values if (fitIntercept && intercept != 0) { - var i = 0 - while (i < size) { arr(i) = intercept; i += 1 } + java.util.Arrays.fill(arr, intercept) BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) } else { BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) @@ -206,16 +204,16 @@ private[ml] class BlockHingeAggregator( block.matrix match { case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - arr, 1, 1.0, localGradientSumArray, 1) - if (fitIntercept) localGradientSumArray(numFeatures) += arr.sum + arr, 1, 1.0, gradientSumArray, 1) + if (fitIntercept) gradientSumArray(numFeatures) += arr.sum case sm: SparseMatrix if fitIntercept => BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v } - localGradientSumArray(numFeatures) += arr.sum + BLAS.nativeBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) + gradientSumArray(numFeatures) += arr.sum case sm: SparseMatrix if !fitIntercept => - val gradSumVec = new DenseVector(localGradientSumArray) + val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) } From f8fbab76423fe74e5871a1fbc5f42889ef72af91 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Tue, 28 Apr 2020 13:38:47 +0800 Subject: [PATCH 07/11] nit nit --- .../spark/ml/classification/LinearSVC.scala | 23 +++---- .../ml/classification/LinearSVCSuite.scala | 29 +++------ .../aggregator/HingeAggregatorSuite.scala | 62 ++++++++++++++++++- 3 files changed, 83 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 6bc713067571..1ee4133968b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -175,7 +175,8 @@ class LinearSVC @Since("2.2.0") ( instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol, regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize) - val instances = extractInstances(dataset).setName("training instances") + val instances = extractInstances(dataset) + .setName("training instances") val (summarizer, labelSummarizer) = if ($(blockSize) == 1) { if (dataset.storageLevel == StorageLevel.NONE) { @@ -201,7 +202,7 @@ class LinearSVC @Since("2.2.0") ( val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum instr.logNamedValue("sparsity", sparsity.toString) if (sparsity > 0.5) { - logWarning(s"sparsity of input dataset is $sparsity, " + + instr.logWarning(s"sparsity of input dataset is $sparsity, " + s"which may hurt performance in high-level BLAS.") } } @@ -242,7 +243,7 @@ class LinearSVC @Since("2.2.0") ( Note that the intercept in scaled space and original space is the same; as a result, no scaling is needed. */ - val rawCoefficients = if ($(blockSize) == 1) { + val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) { trainOnRows(instances, featuresStd, regularization, optimizer) } else { trainOnBlocks(instances, featuresStd, regularization, optimizer) @@ -266,7 +267,7 @@ class LinearSVC @Since("2.2.0") ( instances: RDD[Instance], featuresStd: Array[Double], regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = { + optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { val numFeatures = featuresStd.length val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures @@ -278,22 +279,22 @@ class LinearSVC @Since("2.2.0") ( val states = optimizer.iterations(new CachedDiffFunction(costFun), Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) - val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] + val arrayBuilder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { state = states.next() - scaledObjectiveHistory += state.adjustedValue + arrayBuilder += state.adjustedValue } bcFeaturesStd.destroy() - if (state == null) null else state.x.toArray + (if (state == null) null else state.x.toArray, arrayBuilder.result) } private def trainOnBlocks( instances: RDD[Instance], featuresStd: Array[Double], regularization: Option[L2Regularization], - optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = { + optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = { val numFeatures = featuresStd.length val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures @@ -321,16 +322,16 @@ class LinearSVC @Since("2.2.0") ( val states = optimizer.iterations(new CachedDiffFunction(costFun), Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector) - val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double] + val arrayBuilder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { state = states.next() - scaledObjectiveHistory += state.adjustedValue + arrayBuilder += state.adjustedValue } blocks.unpersist() bcFeaturesStd.destroy() - if (state == null) null else state.x.toArray + (if (state == null) null else state.x.toArray, arrayBuilder.result) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 6740018e78c8..579d6b12ab99 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -208,25 +208,16 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest { } test("LinearSVC on blocks") { - Seq(smallBinaryDataset, smallSparseBinaryDataset).foreach { dataset => - { - val lsvc = new LinearSVC().setFitIntercept(false).setBlockSize(1).setMaxIter(5) - val model = lsvc.fit(dataset) - Seq(2, 4, 8, 16, 32).foreach { blockSize => - val model2 = lsvc.setBlockSize(blockSize).fit(dataset) - assert(model.intercept ~== model2.intercept relTol 1e-9) - assert(model.coefficients ~== model2.coefficients relTol 1e-9) - } - } - - { - val lsvc = new LinearSVC().setFitIntercept(true).setBlockSize(1).setMaxIter(5) - val model = lsvc.fit(dataset) - Seq(2, 4, 8, 16, 32).foreach { blockSize => - val model2 = lsvc.setBlockSize(blockSize).fit(dataset) - assert(model.intercept ~== model2.intercept relTol 1e-9) - assert(model.coefficients ~== model2.coefficients relTol 1e-9) - } + for (dataset <- Seq(smallBinaryDataset, smallSparseBinaryDataset); + fitIntercept <- Seq(true, false)) { + val lsvc = new LinearSVC() + .setFitIntercept(fitIntercept) + .setMaxIter(5) + val model = lsvc.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = lsvc.setBlockSize(blockSize).fit(dataset) + assert(model.intercept ~== model2.intercept relTol 1e-9) + assert(model.coefficients ~== model2.coefficients relTol 1e-9) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 16d27a995bc6..b27c13ced80c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.stat.Summarizer import org.apache.spark.ml.util.TestingUtils._ @@ -61,6 +61,20 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) } + /** Get summary statistics for some data and create a new BlockHingeAggregator. */ + private def getNewBlockAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean, + blockSize: Int): BlockHingeAggregator = { + val (featuresSummarizer, ySummarizer) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val featuresStd = featuresSummarizer.std.toArray + val numFeatures = featuresStd.length + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new BlockHingeAggregator(numFeatures, fitIntercept, blockSize)(bcCoefficients) + } + test("aggregator add method input size") { val coefArray = Array(1.0, 2.0) val interceptArray = Array(2.0) @@ -159,4 +173,50 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("Block HingeAggregator") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + val blocks1 = instances + .grouped(2) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + val blocks3 = blocks1.zipWithIndex.map { case (block, i) => + if (i % 2 == 0) { + new InstanceBlock(block.labels, block.weights, block.matrix.toDense) + } else { + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + } + + val agg1 = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, blockSize = 1) + blocks1.foreach(agg1.add) + val loss1 = agg1.loss + val grad1 = agg1.gradient + for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) { + val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, blockSize = blockSize) + blocks.foreach(agg.add) + assert(loss1 ~== agg.loss relTol 1e-9) + assert(grad1 ~== agg.gradient relTol 1e-9) + } + + val agg2 = getNewBlockAggregator(instances, Vectors.dense(coefArray), + fitIntercept = false, blockSize = 1) + blocks1.foreach(agg2.add) + val loss2 = agg2.loss + val grad2 = agg2.gradient + for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) { + val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray), + fitIntercept = false, blockSize = blockSize) + blocks.foreach(agg.add) + assert(loss2 ~== agg.loss relTol 1e-9) + assert(grad2 ~== agg.gradient relTol 1e-9) + } + } } From 58c0a1ea7be1e2ee798e2d8861da305d08d5debc Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 29 Apr 2020 19:01:00 +0800 Subject: [PATCH 08/11] nit --- .../org/apache/spark/ml/optim/aggregator/HingeAggregator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index c30223c731f2..f8caa6904f96 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -209,7 +209,8 @@ private[ml] class BlockHingeAggregator( case sm: SparseMatrix if fitIntercept => BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.nativeBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) + BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, + gradientSumArray, 1) gradientSumArray(numFeatures) += arr.sum case sm: SparseMatrix if !fitIntercept => From 4ed1227a01077633cc6387148724e8be10c2d04e Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 30 Apr 2020 10:28:09 +0800 Subject: [PATCH 09/11] address comments --- .../spark/ml/classification/LinearSVC.scala | 30 +++++++++++-------- .../ml/optim/aggregator/HingeAggregator.scala | 4 +-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 1ee4133968b2..0f9d46b0c4e3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.feature._ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.optim.aggregator._ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} @@ -158,6 +158,15 @@ class LinearSVC @Since("2.2.0") ( /** * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then points will be stacked to blocks, and high-level BLAS routines will + * be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * An appropriate choice of the block size depends on the dim and sparsity of input datasets, + * the underlying BLAS implementation (for example, f2jBLAS, OpenBLAS, intel MKL) and its + * configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, there maybe no performance gain, the worse is that performance + * regression may occur. * Default is 1. * * @group expertSetParam @@ -234,7 +243,7 @@ class LinearSVC @Since("2.2.0") ( if ($(standardization)) None else Some(getFeaturesStd))) } else None - def regParamL1Fun = (index: Int) => 0D + def regParamL1Fun = (index: Int) => 0.0 val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) /* @@ -287,7 +296,7 @@ class LinearSVC @Since("2.2.0") ( } bcFeaturesStd.destroy() - (if (state == null) null else state.x.toArray, arrayBuilder.result) + (if (state != null) state.x.toArray else null, arrayBuilder.result) } private def trainOnBlocks( @@ -300,15 +309,10 @@ class LinearSVC @Since("2.2.0") ( val bcFeaturesStd = instances.context.broadcast(featuresStd) - val standardized = instances.map { - case Instance(label, weight, features) => - val featuresStd = bcFeaturesStd.value - val array = Array.ofDim[Double](numFeatures) - features.foreachNonZero { (i, v) => - val std = featuresStd(i) - if (std != 0) array(i) = v / std - } - Instance(label, weight, Vectors.dense(array)) + val standardized = instances.mapPartitions { iter => + val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 } + val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true) + iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) } } val blocks = InstanceBlock.blokify(standardized, $(blockSize)) .persist(StorageLevel.MEMORY_AND_DISK) @@ -331,7 +335,7 @@ class LinearSVC @Since("2.2.0") ( blocks.unpersist() bcFeaturesStd.destroy() - (if (state == null) null else state.x.toArray, arrayBuilder.result) + (if (state != null) state.x.toArray else null, arrayBuilder.result) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index f8caa6904f96..4b5080b22cc4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -123,13 +123,13 @@ private[ml] class BlockHingeAggregator( blockSize: Int)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] { - private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - protected override val dim: Int = numFeaturesPlusIntercept + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } + protected override val dim: Int = numFeaturesPlusIntercept @transient private lazy val linear = if (fitIntercept) { Vectors.dense(coefficientsArray.take(numFeatures)).toDense From e02a86e3182028a0f1278079eb9c6cfa8d73f916 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 30 Apr 2020 11:01:16 +0800 Subject: [PATCH 10/11] fix doc --- .../apache/spark/ml/classification/LinearSVC.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 0f9d46b0c4e3..f5e5e3bff729 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -159,14 +159,14 @@ class LinearSVC @Since("2.2.0") ( /** * Set block size for stacking input data in matrices. * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; - * If blockSize > 1, then points will be stacked to blocks, and high-level BLAS routines will - * be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). - * An appropriate choice of the block size depends on the dim and sparsity of input datasets, - * the underlying BLAS implementation (for example, f2jBLAS, OpenBLAS, intel MKL) and its - * configuration (for example, number of threads). + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). * Note that existing BLAS implementations are mainly optimized for dense matrices, if the - * input dataset is sparse, there maybe no performance gain, the worse is that performance - * regression may occur. + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. * Default is 1. * * @group expertSetParam From e8abb4ba6b3b03cc0a696dcf43ee5ede109f88ea Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Mon, 4 May 2020 21:24:02 +0800 Subject: [PATCH 11/11] remove some transient lazy variables --- .../spark/ml/classification/LinearSVC.scala | 3 +- .../ml/optim/aggregator/HingeAggregator.scala | 62 +++++------- .../aggregator/HingeAggregatorSuite.scala | 96 ++++++++----------- 3 files changed, 64 insertions(+), 97 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index f5e5e3bff729..69c35a8a80f5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -318,8 +318,7 @@ class LinearSVC @Since("2.2.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val getAggregatorFunc = new BlockHingeAggregator(numFeatures, - $(fitIntercept), $(blockSize))(_) + val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_) val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 4b5080b22cc4..1525bb9bfcbb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -118,33 +118,23 @@ private[ml] class HingeAggregator( * @param fitIntercept Whether to fit an intercept term. */ private[ml] class BlockHingeAggregator( - numFeatures: Int, - fitIntercept: Boolean, - blockSize: Int)(bcCoefficients: Broadcast[Vector]) + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] { - private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + protected override val dim: Int = bcCoefficients.value.size + private val numFeatures = if (fitIntercept) dim - 1 else dim + @transient private lazy val coefficientsArray = bcCoefficients.value match { case DenseVector(values) => values case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + s" but got type ${bcCoefficients.value.getClass}.") } - protected override val dim: Int = numFeaturesPlusIntercept - @transient private lazy val linear = if (fitIntercept) { - Vectors.dense(coefficientsArray.take(numFeatures)).toDense - } else { - Vectors.dense(coefficientsArray).toDense + @transient private lazy val linear = { + val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray + Vectors.dense(linear).toDense } - @transient private lazy val intercept = - if (fitIntercept) coefficientsArray.last else 0.0 - - @transient private lazy val linearGradSumVec = - if (fitIntercept) Vectors.zeros(numFeatures).toDense else null - - @transient private lazy val auxiliaryVec = Vectors.zeros(blockSize).toDense - /** * Add a new training instance block to this HingeAggregator, and update the loss and gradient * of the objective function. @@ -162,20 +152,18 @@ private[ml] class BlockHingeAggregator( if (block.weightIter.forall(_ == 0)) return this val size = block.size - // vec/arr here represents dotProducts - val vec = if (size == blockSize) auxiliaryVec else Vectors.zeros(size).toDense - val arr = vec.values - - if (fitIntercept && intercept != 0) { - java.util.Arrays.fill(arr, intercept) - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + // vec here represents dotProducts + val vec = if (fitIntercept) { + Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense } else { - BLAS.gemv(1.0, block.matrix, linear, 0.0, vec) + Vectors.zeros(size).toDense } + BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) // in-place convert dotProducts to gradient scales - // then, vec/arr represents gradient scales + // then, vec represents gradient scales var i = 0 + var interceptGradSum = 0.0 while (i < size) { val weight = block.getWeight(i) if (weight > 0) { @@ -184,34 +172,32 @@ private[ml] class BlockHingeAggregator( // Therefore the gradient is -(2y - 1)*x val label = block.getLabel(i) val labelScaled = label + label - 1.0 - val loss = (1.0 - labelScaled * arr(i)) * weight + val loss = (1.0 - labelScaled * vec.values(i)) * weight if (loss > 0) { lossSum += loss val gradScale = -labelScaled * weight - arr(i) = gradScale - } else { - arr(i) = 0.0 - } - } else { - arr(i) = 0.0 - } + vec.values(i) = gradScale + if (fitIntercept) interceptGradSum += gradScale + } else { vec.values(i) = 0.0 } + } else { vec.values(i) = 0.0 } i += 1 } // predictions are all correct, no gradient signal - if (arr.forall(_ == 0)) return this + if (vec.values.forall(_ == 0)) return this block.matrix match { case dm: DenseMatrix => BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - arr, 1, 1.0, gradientSumArray, 1) - if (fitIntercept) gradientSumArray(numFeatures) += arr.sum + vec.values, 1, 1.0, gradientSumArray, 1) + if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if fitIntercept => + val linearGradSumVec = Vectors.zeros(numFeatures).toDense BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, gradientSumArray, 1) - gradientSumArray(numFeatures) += arr.sum + gradientSumArray(numFeatures) += interceptGradSum case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(gradientSumArray) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index b27c13ced80c..51a1edd315e5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -28,6 +28,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { @transient var instances: Array[Instance] = _ @transient var instancesConstantFeature: Array[Instance] = _ @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + @transient var standardizedInstances: Array[Instance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -46,6 +47,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { Instance(1.0, 0.5, Vectors.dense(1.0)), Instance(2.0, 0.3, Vectors.dense(0.5)) ) + standardizedInstances = standardize(instances) } /** Get summary statistics for some data and create a new HingeAggregator. */ @@ -61,18 +63,27 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) } + private def standardize(instances: Array[Instance]): Array[Instance] = { + val (featuresSummarizer, _) = + Summarizer.getClassificationSummarizers(sc.parallelize(instances)) + val stdArray = featuresSummarizer.std.toArray + val numFeatures = stdArray.length + instances.map { case Instance(label, weight, features) => + val standardized = Array.ofDim[Double](numFeatures) + features.foreachNonZero { (i, v) => + val std = stdArray(i) + if (std != 0) standardized(i) = v / std + } + Instance(label, weight, Vectors.dense(standardized).compressed) + } + } + /** Get summary statistics for some data and create a new BlockHingeAggregator. */ private def getNewBlockAggregator( - instances: Array[Instance], coefficients: Vector, - fitIntercept: Boolean, - blockSize: Int): BlockHingeAggregator = { - val (featuresSummarizer, ySummarizer) = - Summarizer.getClassificationSummarizers(sc.parallelize(instances)) - val featuresStd = featuresSummarizer.std.toArray - val numFeatures = featuresStd.length + fitIntercept: Boolean): BlockHingeAggregator = { val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new BlockHingeAggregator(numFeatures, fitIntercept, blockSize)(bcCoefficients) + new BlockHingeAggregator(fitIntercept)(bcCoefficients) } test("aggregator add method input size") { @@ -153,8 +164,26 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { } val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) - assert(loss ~== agg.loss relTol 0.01) - assert(gradient ~== agg.gradient relTol 0.01) + assert(loss ~== agg.loss relTol 1e-9) + assert(gradient ~== agg.gradient relTol 1e-9) + + Seq(1, 2, 4).foreach { blockSize => + val blocks1 = standardizedInstances + .grouped(blockSize) + .map(seq => InstanceBlock.fromInstances(seq)) + .toArray + val blocks2 = blocks1.map { block => + new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) + } + + Seq(blocks1, blocks2).foreach { blocks => + val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + blocks.foreach(blockAgg.add) + assert(loss ~== blockAgg.loss relTol 1e-9) + assert(gradient ~== blockAgg.gradient relTol 1e-9) + } + } } test("check with zero standard deviation") { @@ -172,51 +201,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(0) === 0.0) assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } - - test("Block HingeAggregator") { - val coefArray = Array(1.0, 2.0) - val intercept = 1.0 - val blocks1 = instances - .grouped(2) - .map(seq => InstanceBlock.fromInstances(seq)) - .toArray - - val blocks2 = blocks1.map { block => - new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) - } - - val blocks3 = blocks1.zipWithIndex.map { case (block, i) => - if (i % 2 == 0) { - new InstanceBlock(block.labels, block.weights, block.matrix.toDense) - } else { - new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor) - } - } - - val agg1 = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, blockSize = 1) - blocks1.foreach(agg1.add) - val loss1 = agg1.loss - val grad1 = agg1.gradient - for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) { - val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, blockSize = blockSize) - blocks.foreach(agg.add) - assert(loss1 ~== agg.loss relTol 1e-9) - assert(grad1 ~== agg.gradient relTol 1e-9) - } - - val agg2 = getNewBlockAggregator(instances, Vectors.dense(coefArray), - fitIntercept = false, blockSize = 1) - blocks1.foreach(agg2.add) - val loss2 = agg2.loss - val grad2 = agg2.gradient - for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) { - val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray), - fitIntercept = false, blockSize = blockSize) - blocks.foreach(agg.add) - assert(loss2 ~== agg.loss relTol 1e-9) - assert(grad2 ~== agg.gradient relTol 1e-9) - } - } }