Skip to content

Commit a97a8fc

Browse files
committed
nit
nit
1 parent 0772834 commit a97a8fc

File tree

3 files changed

+83
-31
lines changed

3 files changed

+83
-31
lines changed

mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ class LinearSVC @Since("2.2.0") (
175175
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
176176
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
177177

178-
val instances = extractInstances(dataset).setName("training instances")
178+
val instances = extractInstances(dataset)
179+
.setName("training instances")
179180

180181
val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
181182
if (dataset.storageLevel == StorageLevel.NONE) {
@@ -201,7 +202,7 @@ class LinearSVC @Since("2.2.0") (
201202
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
202203
instr.logNamedValue("sparsity", sparsity.toString)
203204
if (sparsity > 0.5) {
204-
logWarning(s"sparsity of input dataset is $sparsity, " +
205+
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
205206
s"which may hurt performance in high-level BLAS.")
206207
}
207208
}
@@ -242,7 +243,7 @@ class LinearSVC @Since("2.2.0") (
242243
Note that the intercept in scaled space and original space is the same;
243244
as a result, no scaling is needed.
244245
*/
245-
val rawCoefficients = if ($(blockSize) == 1) {
246+
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
246247
trainOnRows(instances, featuresStd, regularization, optimizer)
247248
} else {
248249
trainOnBlocks(instances, featuresStd, regularization, optimizer)
@@ -266,7 +267,7 @@ class LinearSVC @Since("2.2.0") (
266267
instances: RDD[Instance],
267268
featuresStd: Array[Double],
268269
regularization: Option[L2Regularization],
269-
optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = {
270+
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
270271
val numFeatures = featuresStd.length
271272
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
272273

@@ -278,22 +279,22 @@ class LinearSVC @Since("2.2.0") (
278279
val states = optimizer.iterations(new CachedDiffFunction(costFun),
279280
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
280281

281-
val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double]
282+
val arrayBuilder = mutable.ArrayBuilder.make[Double]
282283
var state: optimizer.State = null
283284
while (states.hasNext) {
284285
state = states.next()
285-
scaledObjectiveHistory += state.adjustedValue
286+
arrayBuilder += state.adjustedValue
286287
}
287288
bcFeaturesStd.destroy()
288289

289-
if (state == null) null else state.x.toArray
290+
(if (state == null) null else state.x.toArray, arrayBuilder.result)
290291
}
291292

292293
private def trainOnBlocks(
293294
instances: RDD[Instance],
294295
featuresStd: Array[Double],
295296
regularization: Option[L2Regularization],
296-
optimizer: BreezeOWLQN[Int, BDV[Double]]): Array[Double] = {
297+
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
297298
val numFeatures = featuresStd.length
298299
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
299300

@@ -321,16 +322,16 @@ class LinearSVC @Since("2.2.0") (
321322
val states = optimizer.iterations(new CachedDiffFunction(costFun),
322323
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
323324

324-
val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double]
325+
val arrayBuilder = mutable.ArrayBuilder.make[Double]
325326
var state: optimizer.State = null
326327
while (states.hasNext) {
327328
state = states.next()
328-
scaledObjectiveHistory += state.adjustedValue
329+
arrayBuilder += state.adjustedValue
329330
}
330331
blocks.unpersist()
331332
bcFeaturesStd.destroy()
332333

333-
if (state == null) null else state.x.toArray
334+
(if (state == null) null else state.x.toArray, arrayBuilder.result)
334335
}
335336
}
336337

mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,25 +208,16 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
208208
}
209209

210210
test("LinearSVC on blocks") {
211-
Seq(smallBinaryDataset, smallSparseBinaryDataset).foreach { dataset =>
212-
{
213-
val lsvc = new LinearSVC().setFitIntercept(false).setBlockSize(1).setMaxIter(5)
214-
val model = lsvc.fit(dataset)
215-
Seq(2, 4, 8, 16, 32).foreach { blockSize =>
216-
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
217-
assert(model.intercept ~== model2.intercept relTol 1e-9)
218-
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
219-
}
220-
}
221-
222-
{
223-
val lsvc = new LinearSVC().setFitIntercept(true).setBlockSize(1).setMaxIter(5)
224-
val model = lsvc.fit(dataset)
225-
Seq(2, 4, 8, 16, 32).foreach { blockSize =>
226-
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
227-
assert(model.intercept ~== model2.intercept relTol 1e-9)
228-
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
229-
}
211+
for (dataset <- Seq(smallBinaryDataset, smallSparseBinaryDataset);
212+
fitIntercept <- Seq(true, false)) {
213+
val lsvc = new LinearSVC()
214+
.setFitIntercept(fitIntercept)
215+
.setMaxIter(5)
216+
val model = lsvc.fit(dataset)
217+
Seq(4, 16, 64).foreach { blockSize =>
218+
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
219+
assert(model.intercept ~== model2.intercept relTol 1e-9)
220+
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
230221
}
231222
}
232223
}

mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.spark.ml.optim.aggregator
1818

1919
import org.apache.spark.SparkFunSuite
20-
import org.apache.spark.ml.feature.Instance
20+
import org.apache.spark.ml.feature.{Instance, InstanceBlock}
2121
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
2222
import org.apache.spark.ml.stat.Summarizer
2323
import org.apache.spark.ml.util.TestingUtils._
@@ -61,6 +61,20 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
6161
new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
6262
}
6363

64+
/** Get summary statistics for some data and create a new BlockHingeAggregator. */
65+
private def getNewBlockAggregator(
66+
instances: Array[Instance],
67+
coefficients: Vector,
68+
fitIntercept: Boolean,
69+
blockSize: Int): BlockHingeAggregator = {
70+
val (featuresSummarizer, ySummarizer) =
71+
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
72+
val featuresStd = featuresSummarizer.std.toArray
73+
val numFeatures = featuresStd.length
74+
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
75+
new BlockHingeAggregator(numFeatures, fitIntercept, blockSize)(bcCoefficients)
76+
}
77+
6478
test("aggregator add method input size") {
6579
val coefArray = Array(1.0, 2.0)
6680
val interceptArray = Array(2.0)
@@ -159,4 +173,50 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
159173
assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
160174
}
161175

176+
test("Block HingeAggregator") {
177+
val coefArray = Array(1.0, 2.0)
178+
val intercept = 1.0
179+
val blocks1 = instances
180+
.grouped(2)
181+
.map(seq => InstanceBlock.fromInstances(seq))
182+
.toArray
183+
184+
val blocks2 = blocks1.map { block =>
185+
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
186+
}
187+
188+
val blocks3 = blocks1.zipWithIndex.map { case (block, i) =>
189+
if (i % 2 == 0) {
190+
new InstanceBlock(block.labels, block.weights, block.matrix.toDense)
191+
} else {
192+
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
193+
}
194+
}
195+
196+
val agg1 = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
197+
fitIntercept = true, blockSize = 1)
198+
blocks1.foreach(agg1.add)
199+
val loss1 = agg1.loss
200+
val grad1 = agg1.gradient
201+
for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) {
202+
val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
203+
fitIntercept = true, blockSize = blockSize)
204+
blocks.foreach(agg.add)
205+
assert(loss1 ~== agg.loss relTol 1e-9)
206+
assert(grad1 ~== agg.gradient relTol 1e-9)
207+
}
208+
209+
val agg2 = getNewBlockAggregator(instances, Vectors.dense(coefArray),
210+
fitIntercept = false, blockSize = 1)
211+
blocks1.foreach(agg2.add)
212+
val loss2 = agg2.loss
213+
val grad2 = agg2.gradient
214+
for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) {
215+
val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray),
216+
fitIntercept = false, blockSize = blockSize)
217+
blocks.foreach(agg.add)
218+
assert(loss2 ~== agg.loss relTol 1e-9)
219+
assert(grad2 ~== agg.gradient relTol 1e-9)
220+
}
221+
}
162222
}

0 commit comments

Comments
 (0)