@@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
4242/** Params for linear SVM Classifier. */
4343private [classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
4444 with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
45- with HasAggregationDepth with HasThreshold with HasBlockSize {
45+ with HasAggregationDepth with HasThreshold with HasMaxBlockSizeInMB {
4646
4747 /**
4848 * Param for threshold in binary classification prediction.
@@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR
5757 " threshold in binary classification prediction applied to rawPrediction" )
5858
5959 setDefault(regParam -> 0.0 , maxIter -> 100 , fitIntercept -> true , tol -> 1E-6 ,
60- standardization -> true , threshold -> 0.0 , aggregationDepth -> 2 , blockSize -> 1 )
60+ standardization -> true , threshold -> 0.0 , aggregationDepth -> 2 , maxBlockSizeInMB -> 0.0 )
6161}
6262
6363/**
@@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") (
153153 def setAggregationDepth (value : Int ): this .type = set(aggregationDepth, value)
154154
155155 /**
156- * Set block size for stacking input data in matrices.
157- * If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
158- * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
159- * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
160- * Recommended size is between 10 and 1000. An appropriate choice of the block size depends
161- * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
162- * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
163- * Note that existing BLAS implementations are mainly optimized for dense matrices, if the
164- * input dataset is sparse, stacking may bring no performance gain, the worse is possible
165- * performance regression.
166- * Default is 1.
156+ * Sets the value of param [[maxBlockSizeInMB ]].
157+ * Default is 0.0.
167158 *
168159 * @group expertSetParam
169160 */
170161 @ Since (" 3.1.0" )
171- def setBlockSize (value : Int ): this .type = set(blockSize , value)
162+ def setMaxBlockSizeInMB (value : Double ): this .type = set(maxBlockSizeInMB , value)
172163
173164 @ Since (" 2.2.0" )
174165 override def copy (extra : ParamMap ): LinearSVC = defaultCopy(extra)
@@ -177,19 +168,19 @@ class LinearSVC @Since("2.2.0") (
177168 instr.logPipelineStage(this )
178169 instr.logDataset(dataset)
179170 instr.logParams(this , labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
180- regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
171+ regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth,
172+ maxBlockSizeInMB)
173+
174+ if (dataset.storageLevel != StorageLevel .NONE ) {
175+ instr.logWarning(s " Input instances will be standardized, blockified to blocks, and " +
176+ s " then cached during training. Be careful of double caching! " )
177+ }
181178
182179 val instances = extractInstances(dataset)
183180 .setName(" training instances" )
184181
185- if (dataset.storageLevel == StorageLevel .NONE && $(blockSize) == 1 ) {
186- instances.persist(StorageLevel .MEMORY_AND_DISK )
187- }
188-
189- var requestedMetrics = Seq (" mean" , " std" , " count" )
190- if ($(blockSize) != 1 ) requestedMetrics +:= " numNonZeros"
191182 val (summarizer, labelSummarizer) = Summarizer
192- .getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics )
183+ .getClassificationSummarizers(instances, $(aggregationDepth), Seq ( " mean " , " std " , " count " ) )
193184
194185 val histogram = labelSummarizer.histogram
195186 val numInvalid = labelSummarizer.countInvalid
@@ -199,14 +190,12 @@ class LinearSVC @Since("2.2.0") (
199190 instr.logNamedValue(" lowestLabelWeight" , labelSummarizer.histogram.min.toString)
200191 instr.logNamedValue(" highestLabelWeight" , labelSummarizer.histogram.max.toString)
201192 instr.logSumOfWeights(summarizer.weightSum)
202- if ($(blockSize) > 1 ) {
203- val scale = 1.0 / summarizer.count / numFeatures
204- val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
205- instr.logNamedValue(" sparsity" , sparsity.toString)
206- if (sparsity > 0.5 ) {
207- instr.logWarning(s " sparsity of input dataset is $sparsity, " +
208- s " which may hurt performance in high-level BLAS. " )
209- }
193+
194+ var actualBlockSizeInMB = $(maxBlockSizeInMB)
195+ if (actualBlockSizeInMB == 0 ) {
196+ actualBlockSizeInMB = InstanceBlock .DefaultBlockSizeInMB
197+ require(actualBlockSizeInMB > 0 , " inferred actual BlockSizeInMB must > 0" )
198+ instr.logNamedValue(" actualBlockSizeInMB" , actualBlockSizeInMB.toString)
210199 }
211200
212201 val numClasses = MetadataUtils .getNumClasses(dataset.schema($(labelCol))) match {
@@ -245,12 +234,8 @@ class LinearSVC @Since("2.2.0") (
245234 Note that the intercept in scaled space and original space is the same;
246235 as a result, no scaling is needed.
247236 */
248- val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1 ) {
249- trainOnRows(instances, featuresStd, regularization, optimizer)
250- } else {
251- trainOnBlocks(instances, featuresStd, regularization, optimizer)
252- }
253- if (instances.getStorageLevel != StorageLevel .NONE ) instances.unpersist()
237+ val (rawCoefficients, objectiveHistory) =
238+ trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)
254239
255240 if (rawCoefficients == null ) {
256241 val msg = s " ${optimizer.getClass.getName} failed. "
@@ -284,35 +269,9 @@ class LinearSVC @Since("2.2.0") (
284269 model.setSummary(Some (summary))
285270 }
286271
287- private def trainOnRows (
288- instances : RDD [Instance ],
289- featuresStd : Array [Double ],
290- regularization : Option [L2Regularization ],
291- optimizer : BreezeOWLQN [Int , BDV [Double ]]): (Array [Double ], Array [Double ]) = {
292- val numFeatures = featuresStd.length
293- val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
294-
295- val bcFeaturesStd = instances.context.broadcast(featuresStd)
296- val getAggregatorFunc = new HingeAggregator (bcFeaturesStd, $(fitIntercept))(_)
297- val costFun = new RDDLossFunction (instances, getAggregatorFunc,
298- regularization, $(aggregationDepth))
299-
300- val states = optimizer.iterations(new CachedDiffFunction (costFun),
301- Vectors .zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
302-
303- val arrayBuilder = mutable.ArrayBuilder .make[Double ]
304- var state : optimizer.State = null
305- while (states.hasNext) {
306- state = states.next()
307- arrayBuilder += state.adjustedValue
308- }
309- bcFeaturesStd.destroy()
310-
311- (if (state != null ) state.x.toArray else null , arrayBuilder.result)
312- }
313-
314- private def trainOnBlocks (
272+ private def trainImpl (
315273 instances : RDD [Instance ],
274+ actualBlockSizeInMB : Double ,
316275 featuresStd : Array [Double ],
317276 regularization : Option [L2Regularization ],
318277 optimizer : BreezeOWLQN [Int , BDV [Double ]]): (Array [Double ], Array [Double ]) = {
@@ -326,9 +285,11 @@ class LinearSVC @Since("2.2.0") (
326285 val func = StandardScalerModel .getTransformFunc(Array .empty, inverseStd, false , true )
327286 iter.map { case Instance (label, weight, vec) => Instance (label, weight, func(vec)) }
328287 }
329- val blocks = InstanceBlock .blokify(standardized, $(blockSize))
288+
289+ val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L ).ceil.toLong
290+ val blocks = InstanceBlock .blokifyWithMaxMemUsage(standardized, maxMemUsage)
330291 .persist(StorageLevel .MEMORY_AND_DISK )
331- .setName(s " training blocks (blockSize= ${$(blockSize)} ) " )
292+ .setName(s " training blocks (blockSizeInMB= $actualBlockSizeInMB ) " )
332293
333294 val getAggregatorFunc = new BlockHingeAggregator ($(fitIntercept))(_)
334295 val costFun = new RDDLossFunction (blocks, getAggregatorFunc,
0 commit comments