@@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path
2626import org .apache .spark .SparkException
2727import org .apache .spark .annotation .Since
2828import org .apache .spark .internal .Logging
29+ import org .apache .spark .ml .feature .{Instance , InstanceBlock }
2930import org .apache .spark .ml .linalg ._
30- import org .apache .spark .ml .optim .aggregator .HingeAggregator
31+ import org .apache .spark .ml .optim .aggregator ._
3132import org .apache .spark .ml .optim .loss .{L2Regularization , RDDLossFunction }
3233import org .apache .spark .ml .param ._
3334import org .apache .spark .ml .param .shared ._
3435import org .apache .spark .ml .stat ._
3536import org .apache .spark .ml .util ._
3637import org .apache .spark .ml .util .Instrumentation .instrumented
38+ import org .apache .spark .rdd .RDD
3739import org .apache .spark .sql .{Dataset , Row }
3840import org .apache .spark .storage .StorageLevel
3941
4042/** Params for linear SVM Classifier. */
4143private [classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
4244 with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
43- with HasAggregationDepth with HasThreshold {
45+ with HasAggregationDepth with HasThreshold with HasBlockSize {
4446
4547 /**
4648 * Param for threshold in binary classification prediction.
@@ -154,31 +156,54 @@ class LinearSVC @Since("2.2.0") (
154156 def setAggregationDepth (value : Int ): this .type = set(aggregationDepth, value)
155157 setDefault(aggregationDepth -> 2 )
156158
159+ /**
160+ * Set block size for stacking input data in matrices.
161+ * Default is 1.
162+ *
163+ * @group expertSetParam
164+ */
165+ @ Since (" 3.0.0" )
166+ def setBlockSize (value : Int ): this .type = set(blockSize, value)
167+ setDefault(blockSize -> 1 )
168+
157169 @ Since (" 2.2.0" )
158170 override def copy (extra : ParamMap ): LinearSVC = defaultCopy(extra)
159171
160172 override protected def train (dataset : Dataset [_]): LinearSVCModel = instrumented { instr =>
161- val handlePersistence = dataset.storageLevel == StorageLevel .NONE
162-
163- val instances = extractInstances(dataset)
164- if (handlePersistence) instances.persist(StorageLevel .MEMORY_AND_DISK )
165-
166173 instr.logPipelineStage(this )
167174 instr.logDataset(dataset)
168175 instr.logParams(this , labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
169- regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
176+ regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize )
170177
171- val (summarizer, labelSummarizer) =
178+ val instances = extractInstances(dataset).setName(" training instances" )
179+
180+ val (summarizer, labelSummarizer) = if ($(blockSize) == 1 ) {
181+ if (dataset.storageLevel == StorageLevel .NONE ) {
182+ instances.persist(StorageLevel .MEMORY_AND_DISK )
183+ }
172184 Summarizer .getClassificationSummarizers(instances, $(aggregationDepth))
173- instr.logNumExamples(summarizer.count)
174- instr.logNamedValue(" lowestLabelWeight" , labelSummarizer.histogram.min.toString)
175- instr.logNamedValue(" highestLabelWeight" , labelSummarizer.histogram.max.toString)
176- instr.logSumOfWeights(summarizer.weightSum)
185+ } else {
186+ // instances will be standardized and converted to blocks, so no need to cache instances.
187+ Summarizer .getClassificationSummarizers(instances, $(aggregationDepth),
188+ Seq (" mean" , " std" , " count" , " numNonZeros" ))
189+ }
177190
178191 val histogram = labelSummarizer.histogram
179192 val numInvalid = labelSummarizer.countInvalid
180193 val numFeatures = summarizer.mean.size
181- val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures
194+
195+ instr.logNumExamples(summarizer.count)
196+ instr.logNamedValue(" lowestLabelWeight" , labelSummarizer.histogram.min.toString)
197+ instr.logNamedValue(" highestLabelWeight" , labelSummarizer.histogram.max.toString)
198+ instr.logSumOfWeights(summarizer.weightSum)
199+ if ($(blockSize) > 1 ) {
200+ val sparsity = 1 - summarizer.numNonzeros.toArray.sum / numFeatures
201+ instr.logNamedValue(" sparsity" , sparsity.toString)
202+ if (sparsity > 0.5 ) {
203+ logWarning(s " sparsity of input dataset is $sparsity, " +
204+ s " which may hurt performance in high-level BLAS. " )
205+ }
206+ }
182207
183208 val numClasses = MetadataUtils .getNumClasses(dataset.schema($(labelCol))) match {
184209 case Some (n : Int ) =>
@@ -192,78 +217,122 @@ class LinearSVC @Since("2.2.0") (
192217 instr.logNumClasses(numClasses)
193218 instr.logNumFeatures(numFeatures)
194219
195- val (coefficientVector, interceptVector, objectiveHistory) = {
196- if (numInvalid != 0 ) {
197- val msg = s " Classification labels should be in [0 to ${numClasses - 1 }]. " +
198- s " Found $numInvalid invalid labels. "
199- instr.logError(msg)
200- throw new SparkException (msg)
201- }
202-
203- val featuresStd = summarizer.std.toArray
204- val getFeaturesStd = (j : Int ) => featuresStd(j)
205- val regParamL2 = $(regParam)
206- val bcFeaturesStd = instances.context.broadcast(featuresStd)
207- val regularization = if (regParamL2 != 0.0 ) {
208- val shouldApply = (idx : Int ) => idx >= 0 && idx < numFeatures
209- Some (new L2Regularization (regParamL2, shouldApply,
210- if ($(standardization)) None else Some (getFeaturesStd)))
211- } else {
212- None
213- }
220+ if (numInvalid != 0 ) {
221+ val msg = s " Classification labels should be in [0 to ${numClasses - 1 }]. " +
222+ s " Found $numInvalid invalid labels. "
223+ instr.logError(msg)
224+ throw new SparkException (msg)
225+ }
214226
215- val getAggregatorFunc = new HingeAggregator (bcFeaturesStd, $(fitIntercept))(_)
216- val costFun = new RDDLossFunction (instances, getAggregatorFunc, regularization,
217- $(aggregationDepth))
227+ val featuresStd = summarizer.std.toArray
228+ val getFeaturesStd = (j : Int ) => featuresStd(j)
229+ val regularization = if ($(regParam) != 0.0 ) {
230+ val shouldApply = (idx : Int ) => idx >= 0 && idx < numFeatures
231+ Some (new L2Regularization ($(regParam), shouldApply,
232+ if ($(standardization)) None else Some (getFeaturesStd)))
233+ } else None
234+
235+ def regParamL1Fun = (index : Int ) => 0D
236+ val optimizer = new BreezeOWLQN [Int , BDV [Double ]]($(maxIter), 10 , regParamL1Fun, $(tol))
237+
238+ /*
239+ The coefficients are trained in the scaled space; we're converting them back to
240+ the original space.
241+ Note that the intercept in scaled space and original space is the same;
242+ as a result, no scaling is needed.
243+ */
244+ val state = if ($(blockSize) == 1 ) {
245+ trainOnRows(instances, featuresStd, regularization, optimizer)
246+ } else {
247+ trainOnBlocks(instances, featuresStd, regularization, optimizer)
248+ }
249+ if (instances.getStorageLevel != StorageLevel .NONE ) instances.unpersist()
218250
219- def regParamL1Fun = (index : Int ) => 0D
220- val optimizer = new BreezeOWLQN [Int , BDV [Double ]]($(maxIter), 10 , regParamL1Fun, $(tol))
221- val initialCoefWithIntercept = Vectors .zeros(numFeaturesPlusIntercept)
251+ if (state == null ) {
252+ val msg = s " ${optimizer.getClass.getName} failed. "
253+ instr.logError(msg)
254+ throw new SparkException (msg)
255+ }
256+ val rawCoefficients = state.x.toArray
222257
223- val states = optimizer.iterations(new CachedDiffFunction (costFun),
224- initialCoefWithIntercept.asBreeze.toDenseVector)
258+ val coefficientArray = Array .tabulate(numFeatures) { i =>
259+ if (featuresStd(i) != 0.0 ) rawCoefficients(i) / featuresStd(i) else 0.0
260+ }
261+ val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0
262+ copyValues(new LinearSVCModel (uid, Vectors .dense(coefficientArray), intercept))
263+ }
225264
226- val scaledObjectiveHistory = mutable.ArrayBuilder .make[Double ]
227- var state : optimizer.State = null
228- while (states.hasNext) {
229- state = states.next()
230- scaledObjectiveHistory += state.adjustedValue
231- }
265+ private def trainOnRows (
266+ instances : RDD [Instance ],
267+ featuresStd : Array [Double ],
268+ regularization : Option [L2Regularization ],
269+ optimizer : BreezeOWLQN [Int , BDV [Double ]]): optimizer.State = {
270+ val numFeatures = featuresStd.length
271+ val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
272+
273+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
274+ val getAggregatorFunc = new HingeAggregator (bcFeaturesStd, $(fitIntercept))(_)
275+ val costFun = new RDDLossFunction (instances, getAggregatorFunc,
276+ regularization, $(aggregationDepth))
277+
278+ val states = optimizer.iterations(new CachedDiffFunction (costFun),
279+ Vectors .zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
280+
281+ val scaledObjectiveHistory = mutable.ArrayBuilder .make[Double ]
282+ var state : optimizer.State = null
283+ while (states.hasNext) {
284+ state = states.next()
285+ scaledObjectiveHistory += state.adjustedValue
286+ }
287+ bcFeaturesStd.destroy()
232288
233- bcFeaturesStd.destroy()
234- if (state == null ) {
235- val msg = s " ${optimizer.getClass.getName} failed. "
236- instr.logError(msg)
237- throw new SparkException (msg)
238- }
289+ state
290+ }
239291
240- /*
241- The coefficients are trained in the scaled space; we're converting them back to
242- the original space.
243- Note that the intercept in scaled space and original space is the same;
244- as a result, no scaling is needed.
245- */
246- val rawCoefficients = state.x.toArray
247- val coefficientArray = Array .tabulate(numFeatures) { i =>
248- if (featuresStd(i) != 0.0 ) {
249- rawCoefficients(i) / featuresStd(i)
250- } else {
251- 0.0
292+ private def trainOnBlocks (
293+ instances : RDD [Instance ],
294+ featuresStd : Array [Double ],
295+ regularization : Option [L2Regularization ],
296+ optimizer : BreezeOWLQN [Int , BDV [Double ]]): optimizer.State = {
297+ val numFeatures = featuresStd.length
298+ val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
299+
300+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
301+
302+ val standardized = instances.map {
303+ case Instance (label, weight, features) =>
304+ val featuresStd = bcFeaturesStd.value
305+ val array = Array .ofDim[Double ](numFeatures)
306+ features.foreachNonZero { (i, v) =>
307+ val std = featuresStd(i)
308+ if (std != 0 ) array(i) = v / std
252309 }
253- }
254-
255- val intercept = if ($(fitIntercept)) {
256- rawCoefficients(numFeaturesPlusIntercept - 1 )
257- } else {
258- 0.0
259- }
260- (Vectors .dense(coefficientArray), intercept, scaledObjectiveHistory.result())
310+ Instance (label, weight, Vectors .dense(array))
261311 }
312+ val blocks = InstanceBlock .blokify(standardized, $(blockSize))
313+ .persist(StorageLevel .MEMORY_AND_DISK )
314+ .setName(s " training dataset (blockSize= ${$(blockSize)}) " )
315+
316+ val getAggregatorFunc = new BlockHingeAggregator (numFeatures,
317+ $(fitIntercept), $(blockSize))(_)
318+ val costFun = new RDDLossFunction (blocks, getAggregatorFunc,
319+ regularization, $(aggregationDepth))
320+
321+ val states = optimizer.iterations(new CachedDiffFunction (costFun),
322+ Vectors .zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
323+
324+ val scaledObjectiveHistory = mutable.ArrayBuilder .make[Double ]
325+ var state : optimizer.State = null
326+ while (states.hasNext) {
327+ state = states.next()
328+ scaledObjectiveHistory += state.adjustedValue
329+ }
330+ blocks.unpersist()
331+ bcFeaturesStd.destroy()
262332
263- if (handlePersistence) instances.unpersist()
264-
265- copyValues(new LinearSVCModel (uid, coefficientVector, interceptVector))
333+ state
266334 }
335+
267336}
268337
269338@ Since (" 2.2.0" )
0 commit comments