Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",

"org.apache.spark.ml.feature.Instance",
"org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,47 @@ object SparseMatrix {
@Since("2.0.0")
object Matrices {

private[ml] def fromVectors(vectors: Seq[Vector]): Matrix = {
val numRows = vectors.length
val numCols = vectors.head.size
val denseSize = Matrices.getDenseSize(numCols, numRows)
val nnz = vectors.iterator.map(_.numNonzeros).sum
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
if (denseSize < sparseSize) {
val values = Array.ofDim[Double](numRows * numCols)
var offset = 0
var j = 0
while (j < numRows) {
vectors(j).foreachNonZero { (i, v) =>
values(offset + i) = v
}
offset += numCols
j += 1
}
new DenseMatrix(numRows, numCols, values, true)
} else {
val colIndices = MArrayBuilder.make[Int]
val values = MArrayBuilder.make[Double]
val rowPtrs = MArrayBuilder.make[Int]
var rowPtr = 0
rowPtrs += 0
var j = 0
while (j < numRows) {
var nnz = 0
vectors(j).foreachNonZero { (i, v) =>
colIndices += i
values += v
nnz += 1
}
rowPtr += nnz
rowPtrs += rowPtr
j += 1
}
new SparseMatrix(numRows, numCols, rowPtrs.result(),
colIndices.result(), values.result(), true)
}
}

/**
* Creates a column-major dense matrix.
*
Expand Down
222 changes: 147 additions & 75 deletions mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator.HingeAggregator
import org.apache.spark.ml.optim.aggregator._
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.storage.StorageLevel

/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold {
with HasAggregationDepth with HasThreshold with HasBlockSize {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -154,31 +156,65 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might provide a little more comment about what this does. Increasing it increases performance, but at the risk of what, slowing down on sparse input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice of size needs tuning, it depends on dataset sparsity and numFeatures, Increasing it may not always increases performance.

* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
* If blockSize &gt; 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
*
* @group expertSetParam
*/
@Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
setDefault(blockSize -> 1)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE

val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)

val instances = extractInstances(dataset)
.setName("training instances")

val (summarizer, labelSummarizer) =
val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
if (dataset.storageLevel == StorageLevel.NONE) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth))
instr.logNumExamples(summarizer.count)
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
} else {
// instances will be standardized and converted to blocks, so no need to cache instances.
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth),
Seq("mean", "std", "count", "numNonZeros"))
}

val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
val numFeatures = summarizer.mean.size
val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures

instr.logNumExamples(summarizer.count)
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is up to the end user to choose whether high-level blas is used and which BLAS lib is used.
Here computes the sparsity of dataset, if input it too sparse, log a warning.

val scale = 1.0 / summarizer.count / numFeatures
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
instr.logNamedValue("sparsity", sparsity.toString)
if (sparsity > 0.5) {
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
s"which may hurt performance in high-level BLAS.")
}
}

val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
case Some(n: Int) =>
Expand All @@ -192,77 +228,113 @@ class LinearSVC @Since("2.2.0") (
instr.logNumClasses(numClasses)
instr.logNumFeatures(numFeatures)

val (coefficientVector, interceptVector, objectiveHistory) = {
if (numInvalid != 0) {
val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
s"Found $numInvalid invalid labels."
instr.logError(msg)
throw new SparkException(msg)
}
if (numInvalid != 0) {
val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
s"Found $numInvalid invalid labels."
instr.logError(msg)
throw new SparkException(msg)
}

val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regParamL2 = $(regParam)
val bcFeaturesStd = instances.context.broadcast(featuresStd)
val regularization = if (regParamL2 != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization(regParamL2, shouldApply,
if ($(standardization)) None else Some(getFeaturesStd)))
} else {
None
}
val featuresStd = summarizer.std.toArray
val getFeaturesStd = (j: Int) => featuresStd(j)
val regularization = if ($(regParam) != 0.0) {
val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
Some(new L2Regularization($(regParam), shouldApply,
if ($(standardization)) None else Some(getFeaturesStd)))
} else None

def regParamL1Fun = (index: Int) => 0.0
val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))

/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
trainOnRows(instances, featuresStd, regularization, optimizer)
} else {
trainOnBlocks(instances, featuresStd, regularization, optimizer)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()

val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
$(aggregationDepth))
if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
}

def regParamL1Fun = (index: Int) => 0D
val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept)
val coefficientArray = Array.tabulate(numFeatures) { i =>
if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0
}
val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0
copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept))
}

val states = optimizer.iterations(new CachedDiffFunction(costFun),
initialCoefWithIntercept.asBreeze.toDenseVector)
private def trainOnRows(
instances: RDD[Instance],
featuresStd: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
val numFeatures = featuresStd.length
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures

val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc,
regularization, $(aggregationDepth))

val states = optimizer.iterations(new CachedDiffFunction(costFun),
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)

val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
bcFeaturesStd.destroy()

val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
scaledObjectiveHistory += state.adjustedValue
}
(if (state != null) state.x.toArray else null, arrayBuilder.result)
}

bcFeaturesStd.destroy()
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
}
private def trainOnBlocks(
instances: RDD[Instance],
featuresStd: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
val numFeatures = featuresStd.length
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures

/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val rawCoefficients = state.x.toArray
val coefficientArray = Array.tabulate(numFeatures) { i =>
if (featuresStd(i) != 0.0) {
rawCoefficients(i) / featuresStd(i)
} else {
0.0
}
}
val bcFeaturesStd = instances.context.broadcast(featuresStd)

val intercept = if ($(fitIntercept)) {
rawCoefficients(numFeaturesPlusIntercept - 1)
} else {
0.0
}
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
val standardized = instances.mapPartitions { iter =>
val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training dataset (blockSize=${$(blockSize)})")

val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
regularization, $(aggregationDepth))

val states = optimizer.iterations(new CachedDiffFunction(costFun),
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)

val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
blocks.unpersist()
bcFeaturesStd.destroy()

if (handlePersistence) instances.unpersist()

copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
(if (state != null) state.x.toArray else null, arrayBuilder.result)
}
}

Expand Down
Loading