Skip to content

Commit 317bde9

Browse files
committed
use local variable
1 parent 48b2814 commit 317bde9

File tree

5 files changed

+43
-33
lines changed

5 files changed

+43
-33
lines changed

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTAggregator.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,12 @@ private[ml] class BlockAFTAggregator(
197197
* @return This BlockAFTAggregator object.
198198
*/
199199
def add(block: InstanceBlock): this.type = {
200-
// here use Instance.weight to store censor for convenience
201-
val (matrix, labels, censors) = (block.matrix, block.labels, block.weightIter.toArray)
202-
require(matrix.isTransposed)
203-
require(numFeatures == matrix.numCols, s"Dimensions mismatch when adding new " +
204-
s"instance. Expecting $numFeatures but got ${matrix.numCols}.")
205-
require(labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.")
206-
207-
val size = matrix.numRows
208-
require(labels.length == size && censors.length == size)
200+
require(block.matrix.isTransposed)
201+
require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
202+
s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
203+
require(block.labels.forall(_ > 0.0), "The lifetime or label should be greater than 0.")
209204

205+
val size = block.size
210206
val intercept = coefficientsArray(dim - 2)
211207
// sigma is the scale parameter of the AFT model
212208
val sigma = math.exp(coefficientsArray(dim - 1))
@@ -217,26 +213,30 @@ private[ml] class BlockAFTAggregator(
217213
} else {
218214
Vectors.zeros(size).toDense
219215
}
220-
BLAS.gemv(1.0, matrix, linear, 1.0, vec)
216+
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
221217

222218
// in-place convert margins to gradient scales
223219
// then, vec represents gradient scales
220+
var localLossSum = 0.0
224221
var i = 0
225222
var sigmaGradSum = 0.0
226223
while (i < size) {
227-
val ti = labels(i)
228-
val delta = censors(i)
224+
val ti = block.getLabel(i)
225+
// here use Instance.weight to store censor for convenience
226+
val delta = block.getWeight(i)
229227
val margin = vec(i)
230228
val epsilon = (math.log(ti) - margin) / sigma
231229
val expEpsilon = math.exp(epsilon)
232-
lossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon
230+
localLossSum += delta * math.log(sigma) - delta * epsilon + expEpsilon
233231
val multiplier = (delta - expEpsilon) / sigma
234232
vec.values(i) = multiplier
235233
sigmaGradSum += delta + multiplier * sigma * epsilon
236234
i += 1
237235
}
236+
lossSum += localLossSum
237+
weightSum += size
238238

239-
matrix match {
239+
block.matrix match {
240240
case dm: DenseMatrix =>
241241
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
242242
vec.values, 1, 1.0, gradientSumArray, 1)
@@ -250,7 +250,6 @@ private[ml] class BlockAFTAggregator(
250250

251251
if (fitIntercept) gradientSumArray(dim - 2) += vec.values.sum
252252
gradientSumArray(dim - 1) += sigmaGradSum
253-
weightSum += size
254253

255254
this
256255
}

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,24 +162,26 @@ private[ml] class BlockHingeAggregator(
162162

163163
// in-place convert dotProducts to gradient scales
164164
// then, vec represents gradient scales
165+
var localLossSum = 0.0
165166
var i = 0
166167
while (i < size) {
167168
val weight = block.getWeight(i)
168169
if (weight > 0) {
169-
weightSum += weight
170170
// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
171171
// Therefore the gradient is -(2y - 1)*x
172172
val label = block.getLabel(i)
173173
val labelScaled = label + label - 1.0
174174
val loss = (1.0 - labelScaled * vec(i)) * weight
175175
if (loss > 0) {
176-
lossSum += loss
176+
localLossSum += loss
177177
val gradScale = -labelScaled * weight
178178
vec.values(i) = gradScale
179179
} else { vec.values(i) = 0.0 }
180180
} else { vec.values(i) = 0.0 }
181181
i += 1
182182
}
183+
lossSum += localLossSum
184+
weightSum += block.weightIter.sum
183185

184186
// predictions are all correct, no gradient signal
185187
if (vec.values.forall(_ == 0)) return this

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ private[ml] class BlockHuberAggregator(
167167

168168
protected override val dim: Int = bcParameters.value.size
169169
private val numFeatures = if (fitIntercept) dim - 2 else dim - 1
170-
private val sigma = bcParameters.value(dim - 1)
171170
private val intercept = if (fitIntercept) bcParameters.value(dim - 2) else 0.0
172171
// make transient so we do not serialize between aggregation stages
173172
@transient private lazy val linear = Vectors.dense(bcParameters.value.toArray.take(numFeatures))
@@ -187,7 +186,9 @@ private[ml] class BlockHuberAggregator(
187186
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
188187

189188
if (block.weightIter.forall(_ == 0)) return this
189+
190190
val size = block.size
191+
val sigma = bcParameters.value(dim - 1)
191192

192193
// vec here represents margins or dotProducts
193194
val vec = if (fitIntercept) {
@@ -200,23 +201,23 @@ private[ml] class BlockHuberAggregator(
200201
// in-place convert margins to multipliers
201202
// then, vec represents multipliers
202203
var sigmaGradSum = 0.0
204+
var localLossSum = 0.0
203205
var i = 0
204206
while (i < size) {
205207
val weight = block.getWeight(i)
206208
if (weight > 0) {
207-
weightSum += weight
208209
val label = block.getLabel(i)
209210
val margin = vec(i)
210211
val linearLoss = label - margin
211212

212213
if (math.abs(linearLoss) <= sigma * epsilon) {
213-
lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma)
214+
localLossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma)
214215
val linearLossDivSigma = linearLoss / sigma
215216
val multiplier = -1.0 * weight * linearLossDivSigma
216217
vec.values(i) = multiplier
217218
sigmaGradSum += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0))
218219
} else {
219-
lossSum += 0.5 * weight *
220+
localLossSum += 0.5 * weight *
220221
(sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
221222
val sign = if (linearLoss >= 0) -1.0 else 1.0
222223
val multiplier = weight * sign * epsilon
@@ -226,6 +227,8 @@ private[ml] class BlockHuberAggregator(
226227
} else { vec.values(i) = 0.0 }
227228
i += 1
228229
}
230+
lossSum += localLossSum
231+
weightSum += block.weightIter.sum
229232

230233
block.matrix match {
231234
case dm: DenseMatrix =>

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,6 @@ private[ml] class BlockLeastSquaresAggregator(
267267
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
268268
(Vectors.dense(coefficientsArray), offset)
269269
}
270-
// do not use tuple assignment above because it will circumvent the @transient tag
271-
@transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1
272-
@transient private lazy val offset = effectiveCoefAndOffset._2
273270

274271
/**
275272
* Add a new training instance block to this BlockLeastSquaresAggregator, and update the loss
@@ -286,24 +283,28 @@ private[ml] class BlockLeastSquaresAggregator(
286283
s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
287284

288285
if (block.weightIter.forall(_ == 0)) return this
286+
289287
val size = block.size
288+
val (effectiveCoefficientsVec, offset) = effectiveCoefAndOffset
290289

291290
// vec here represents diffs
292291
val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd))
293292
BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec)
294293

295294
// in-place convert diffs to multipliers
296295
// then, vec represents multipliers
296+
var localLossSum = 0.0
297297
var i = 0
298298
while (i < size) {
299299
val weight = block.getWeight(i)
300300
val diff = vec(i)
301-
lossSum += weight * diff * diff / 2
302-
weightSum += weight
301+
localLossSum += weight * diff * diff / 2
303302
val multiplier = weight * diff
304303
vec.values(i) = multiplier
305304
i += 1
306305
}
306+
lossSum += localLossSum
307+
weightSum += block.weightIter.sum
307308

308309
val gradSumVec = new DenseVector(gradientSumArray)
309310
BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -466,24 +466,26 @@ private[ml] class BlockLogisticAggregator(
466466

467467
// in-place convert margins to multiplier
468468
// then, vec represents multiplier
469+
var localLossSum = 0.0
469470
var i = 0
470471
while (i < size) {
471472
val weight = block.getWeight(i)
472473
if (weight > 0) {
473-
weightSum += weight
474474
val label = block.getLabel(i)
475475
val margin = vec(i)
476476
if (label > 0) {
477477
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
478-
lossSum += weight * Utils.log1pExp(margin)
478+
localLossSum += weight * Utils.log1pExp(margin)
479479
} else {
480-
lossSum += weight * (Utils.log1pExp(margin) - margin)
480+
localLossSum += weight * (Utils.log1pExp(margin) - margin)
481481
}
482482
val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
483483
vec.values(i) = multiplier
484484
} else { vec.values(i) = 0.0 }
485485
i += 1
486486
}
487+
lossSum += localLossSum
488+
weightSum += block.weightIter.sum
487489

488490
// predictions are all correct, no gradient signal
489491
if (vec.values.forall(_ == 0)) return
@@ -514,10 +516,11 @@ private[ml] class BlockLogisticAggregator(
514516
// mat here represents margins, shape: S X C
515517
val mat = DenseMatrix.zeros(size, numClasses)
516518
if (fitIntercept) {
519+
val localCoefficientsArray = coefficientsArray
517520
val offset = numClasses * numFeatures
518521
var j = 0
519522
while (j < numClasses) {
520-
val intercept = coefficientsArray(offset + j)
523+
val intercept = localCoefficientsArray(offset + j)
521524
var i = 0
522525
while (i < size) { mat.update(i, j, intercept); i += 1 }
523526
j += 1
@@ -527,13 +530,13 @@ private[ml] class BlockLogisticAggregator(
527530

528531
// in-place convert margins to multipliers
529532
// then, mat represents multipliers
533+
var localLossSum = 0.0
530534
var i = 0
531535
val tmp = Array.ofDim[Double](numClasses)
532536
val interceptGradSumArr = if (fitIntercept) Array.ofDim[Double](numClasses) else null
533537
while (i < size) {
534538
val weight = block.getWeight(i)
535539
if (weight > 0) {
536-
weightSum += weight
537540
val label = block.getLabel(i)
538541

539542
var maxMargin = Double.NegativeInfinity
@@ -566,15 +569,17 @@ private[ml] class BlockLogisticAggregator(
566569
}
567570

568571
if (maxMargin > 0) {
569-
lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin)
572+
localLossSum += weight * (math.log(sum) - marginOfLabel + maxMargin)
570573
} else {
571-
lossSum += weight * (math.log(sum) - marginOfLabel)
574+
localLossSum += weight * (math.log(sum) - marginOfLabel)
572575
}
573576
} else {
574577
var j = 0; while (j < numClasses) { mat.update(i, j, 0.0); j += 1 }
575578
}
576579
i += 1
577580
}
581+
lossSum += localLossSum
582+
weightSum += block.weightIter.sum
578583

579584
// mat (multipliers): S X C, dense N
580585
// mat.transpose (multipliers): C X S, dense T

0 commit comments

Comments
 (0)