Skip to content

Commit e8abb4b

Browse files
committed
remove some transient lazy variables
1 parent e02a86e commit e8abb4b

File tree

3 files changed

+64
-97
lines changed

3 files changed

+64
-97
lines changed

mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,7 @@ class LinearSVC @Since("2.2.0") (
318318
.persist(StorageLevel.MEMORY_AND_DISK)
319319
.setName(s"training dataset (blockSize=${$(blockSize)})")
320320

321-
val getAggregatorFunc = new BlockHingeAggregator(numFeatures,
322-
$(fitIntercept), $(blockSize))(_)
321+
val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
323322
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
324323
regularization, $(aggregationDepth))
325324

mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -118,33 +118,23 @@ private[ml] class HingeAggregator(
118118
* @param fitIntercept Whether to fit an intercept term.
119119
*/
120120
private[ml] class BlockHingeAggregator(
121-
numFeatures: Int,
122-
fitIntercept: Boolean,
123-
blockSize: Int)(bcCoefficients: Broadcast[Vector])
121+
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
124122
extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] {
125123

126-
private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
124+
protected override val dim: Int = bcCoefficients.value.size
125+
private val numFeatures = if (fitIntercept) dim - 1 else dim
126+
127127
@transient private lazy val coefficientsArray = bcCoefficients.value match {
128128
case DenseVector(values) => values
129129
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
130130
s" but got type ${bcCoefficients.value.getClass}.")
131131
}
132-
protected override val dim: Int = numFeaturesPlusIntercept
133132

134-
@transient private lazy val linear = if (fitIntercept) {
135-
Vectors.dense(coefficientsArray.take(numFeatures)).toDense
136-
} else {
137-
Vectors.dense(coefficientsArray).toDense
133+
@transient private lazy val linear = {
134+
val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray
135+
Vectors.dense(linear).toDense
138136
}
139137

140-
@transient private lazy val intercept =
141-
if (fitIntercept) coefficientsArray.last else 0.0
142-
143-
@transient private lazy val linearGradSumVec =
144-
if (fitIntercept) Vectors.zeros(numFeatures).toDense else null
145-
146-
@transient private lazy val auxiliaryVec = Vectors.zeros(blockSize).toDense
147-
148138
/**
149139
* Add a new training instance block to this HingeAggregator, and update the loss and gradient
150140
* of the objective function.
@@ -162,20 +152,18 @@ private[ml] class BlockHingeAggregator(
162152
if (block.weightIter.forall(_ == 0)) return this
163153
val size = block.size
164154

165-
// vec/arr here represents dotProducts
166-
val vec = if (size == blockSize) auxiliaryVec else Vectors.zeros(size).toDense
167-
val arr = vec.values
168-
169-
if (fitIntercept && intercept != 0) {
170-
java.util.Arrays.fill(arr, intercept)
171-
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
155+
// vec here represents dotProducts
156+
val vec = if (fitIntercept) {
157+
Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense
172158
} else {
173-
BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
159+
Vectors.zeros(size).toDense
174160
}
161+
BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
175162

176163
// in-place convert dotProducts to gradient scales
177-
// then, vec/arr represents gradient scales
164+
// then, vec represents gradient scales
178165
var i = 0
166+
var interceptGradSum = 0.0
179167
while (i < size) {
180168
val weight = block.getWeight(i)
181169
if (weight > 0) {
@@ -184,34 +172,32 @@ private[ml] class BlockHingeAggregator(
184172
// Therefore the gradient is -(2y - 1)*x
185173
val label = block.getLabel(i)
186174
val labelScaled = label + label - 1.0
187-
val loss = (1.0 - labelScaled * arr(i)) * weight
175+
val loss = (1.0 - labelScaled * vec.values(i)) * weight
188176
if (loss > 0) {
189177
lossSum += loss
190178
val gradScale = -labelScaled * weight
191-
arr(i) = gradScale
192-
} else {
193-
arr(i) = 0.0
194-
}
195-
} else {
196-
arr(i) = 0.0
197-
}
179+
vec.values(i) = gradScale
180+
if (fitIntercept) interceptGradSum += gradScale
181+
} else { vec.values(i) = 0.0 }
182+
} else { vec.values(i) = 0.0 }
198183
i += 1
199184
}
200185

201186
// predictions are all correct, no gradient signal
202-
if (arr.forall(_ == 0)) return this
187+
if (vec.values.forall(_ == 0)) return this
203188

204189
block.matrix match {
205190
case dm: DenseMatrix =>
206191
BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
207-
arr, 1, 1.0, gradientSumArray, 1)
208-
if (fitIntercept) gradientSumArray(numFeatures) += arr.sum
192+
vec.values, 1, 1.0, gradientSumArray, 1)
193+
if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum
209194

210195
case sm: SparseMatrix if fitIntercept =>
196+
val linearGradSumVec = Vectors.zeros(numFeatures).toDense
211197
BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
212198
BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
213199
gradientSumArray, 1)
214-
gradientSumArray(numFeatures) += arr.sum
200+
gradientSumArray(numFeatures) += interceptGradSum
215201

216202
case sm: SparseMatrix if !fitIntercept =>
217203
val gradSumVec = new DenseVector(gradientSumArray)

mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
2828
@transient var instances: Array[Instance] = _
2929
@transient var instancesConstantFeature: Array[Instance] = _
3030
@transient var instancesConstantFeatureFiltered: Array[Instance] = _
31+
@transient var standardizedInstances: Array[Instance] = _
3132

3233
override def beforeAll(): Unit = {
3334
super.beforeAll()
@@ -46,6 +47,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
4647
Instance(1.0, 0.5, Vectors.dense(1.0)),
4748
Instance(2.0, 0.3, Vectors.dense(0.5))
4849
)
50+
standardizedInstances = standardize(instances)
4951
}
5052

5153
/** Get summary statistics for some data and create a new HingeAggregator. */
@@ -61,18 +63,27 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
6163
new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
6264
}
6365

66+
private def standardize(instances: Array[Instance]): Array[Instance] = {
67+
val (featuresSummarizer, _) =
68+
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
69+
val stdArray = featuresSummarizer.std.toArray
70+
val numFeatures = stdArray.length
71+
instances.map { case Instance(label, weight, features) =>
72+
val standardized = Array.ofDim[Double](numFeatures)
73+
features.foreachNonZero { (i, v) =>
74+
val std = stdArray(i)
75+
if (std != 0) standardized(i) = v / std
76+
}
77+
Instance(label, weight, Vectors.dense(standardized).compressed)
78+
}
79+
}
80+
6481
/** Get summary statistics for some data and create a new BlockHingeAggregator. */
6582
private def getNewBlockAggregator(
66-
instances: Array[Instance],
6783
coefficients: Vector,
68-
fitIntercept: Boolean,
69-
blockSize: Int): BlockHingeAggregator = {
70-
val (featuresSummarizer, ySummarizer) =
71-
Summarizer.getClassificationSummarizers(sc.parallelize(instances))
72-
val featuresStd = featuresSummarizer.std.toArray
73-
val numFeatures = featuresStd.length
84+
fitIntercept: Boolean): BlockHingeAggregator = {
7485
val bcCoefficients = spark.sparkContext.broadcast(coefficients)
75-
new BlockHingeAggregator(numFeatures, fitIntercept, blockSize)(bcCoefficients)
86+
new BlockHingeAggregator(fitIntercept)(bcCoefficients)
7687
}
7788

7889
test("aggregator add method input size") {
@@ -153,8 +164,26 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
153164
}
154165
val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
155166

156-
assert(loss ~== agg.loss relTol 0.01)
157-
assert(gradient ~== agg.gradient relTol 0.01)
167+
assert(loss ~== agg.loss relTol 1e-9)
168+
assert(gradient ~== agg.gradient relTol 1e-9)
169+
170+
Seq(1, 2, 4).foreach { blockSize =>
171+
val blocks1 = standardizedInstances
172+
.grouped(blockSize)
173+
.map(seq => InstanceBlock.fromInstances(seq))
174+
.toArray
175+
val blocks2 = blocks1.map { block =>
176+
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
177+
}
178+
179+
Seq(blocks1, blocks2).foreach { blocks =>
180+
val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)),
181+
fitIntercept = true)
182+
blocks.foreach(blockAgg.add)
183+
assert(loss ~== blockAgg.loss relTol 1e-9)
184+
assert(gradient ~== blockAgg.gradient relTol 1e-9)
185+
}
186+
}
158187
}
159188

160189
test("check with zero standard deviation") {
@@ -172,51 +201,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
172201
assert(aggConstantFeatureBinary.gradient(0) === 0.0)
173202
assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
174203
}
175-
176-
test("Block HingeAggregator") {
177-
val coefArray = Array(1.0, 2.0)
178-
val intercept = 1.0
179-
val blocks1 = instances
180-
.grouped(2)
181-
.map(seq => InstanceBlock.fromInstances(seq))
182-
.toArray
183-
184-
val blocks2 = blocks1.map { block =>
185-
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
186-
}
187-
188-
val blocks3 = blocks1.zipWithIndex.map { case (block, i) =>
189-
if (i % 2 == 0) {
190-
new InstanceBlock(block.labels, block.weights, block.matrix.toDense)
191-
} else {
192-
new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
193-
}
194-
}
195-
196-
val agg1 = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
197-
fitIntercept = true, blockSize = 1)
198-
blocks1.foreach(agg1.add)
199-
val loss1 = agg1.loss
200-
val grad1 = agg1.gradient
201-
for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) {
202-
val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
203-
fitIntercept = true, blockSize = blockSize)
204-
blocks.foreach(agg.add)
205-
assert(loss1 ~== agg.loss relTol 1e-9)
206-
assert(grad1 ~== agg.gradient relTol 1e-9)
207-
}
208-
209-
val agg2 = getNewBlockAggregator(instances, Vectors.dense(coefArray),
210-
fitIntercept = false, blockSize = 1)
211-
blocks1.foreach(agg2.add)
212-
val loss2 = agg2.loss
213-
val grad2 = agg2.gradient
214-
for (blocks <- Seq(blocks1, blocks2, blocks3); blockSize <- Seq(1, 2, 4)) {
215-
val agg = getNewBlockAggregator(instances, Vectors.dense(coefArray),
216-
fitIntercept = false, blockSize = blockSize)
217-
blocks.foreach(agg.add)
218-
assert(loss2 ~== agg.loss relTol 1e-9)
219-
assert(grad2 ~== agg.gradient relTol 1e-9)
220-
}
221-
}
222204
}

0 commit comments

Comments
 (0)