Skip to content

Commit b6e9d5f

Browse files
committed
Address comments.
1 parent 5b8d9d0 commit b6e9d5f

File tree

2 files changed

+58
-55
lines changed

2 files changed

+58
-55
lines changed

mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ class GaussianMixture @Since("2.0.0") (
347347

348348
val instr = Instrumentation.create(this, instances)
349349
instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
350+
instr.logNumFeatures(numFeatures)
350351

351352
val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(
352353
numClusters, numFeatures)
@@ -409,16 +410,14 @@ class GaussianMixture @Since("2.0.0") (
409410
}
410411

411412
val gaussianDists = gaussians.map { case (mean, covVec) =>
412-
val cov = new DenseMatrix(numFeatures, numFeatures,
413-
GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values))
413+
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
414414
new MultivariateGaussian(mean, cov)
415415
}
416416

417417
val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
418418
val summary = new GaussianMixtureSummary(model.transform(dataset),
419419
$(predictionCol), $(probabilityCol), $(featuresCol), $(k))
420420
model.setSummary(Some(summary))
421-
instr.logNumFeatures(model.gaussians.head.mean.size)
422421
instr.logSuccess(model)
423422
model
424423
}
@@ -439,7 +438,7 @@ class GaussianMixture @Since("2.0.0") (
439438
* @param numFeatures The number of features of training instance.
440439
* @return The initialized weights and corresponding gaussian distributions. Note the
441440
* covariance matrix of multivariate gaussian distribution is symmetric and
442-
* we only save the upper triangular part as a dense vector.
441+
* we only save the upper triangular part as a dense vector (column major).
443442
*/
444443
private def initRandom(
445444
instances: RDD[Vector],
@@ -463,8 +462,8 @@ class GaussianMixture @Since("2.0.0") (
463462
Construct matrix where diagonal entries are element-wise
464463
variance of input vectors (computes biased variance).
465464
Since the covariance matrix of multivariate gaussian distribution is symmetric,
466-
only the upper triangular part of the matrix will be saved as a dense vector
467-
in order to reduce the shuffled data size.
465+
only the upper triangular part of the matrix (column major) will be saved as
466+
a dense vector in order to reduce the shuffled data size.
468467
*/
469468
val cov = {
470469
val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
@@ -505,20 +504,20 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
505504

506505
/**
507506
* Convert an n * (n + 1) / 2 dimension array representing the upper triangular part of a matrix
508-
* into an n * n array representing the full symmetric matrix.
507+
* into an n * n array representing the full symmetric matrix (column major).
509508
*
510509
* @param n The order of the n by n matrix.
511510
* @param triangularValues The upper triangular part of the matrix packed in an array
512511
* (column major).
513-
* @return An array which represents the symmetric matrix in column major.
512+
* @return A dense matrix which represents the symmetric matrix in column major.
514513
*/
515514
private[clustering] def unpackUpperTriangularMatrix(
516515
n: Int,
517-
triangularValues: Array[Double]): Array[Double] = {
516+
triangularValues: Array[Double]): DenseMatrix = {
518517
val symmetricValues = new Array[Double](n * n)
519518
var r = 0
520519
var i = 0
521-
while(i < n) {
520+
while (i < n) {
522521
var j = 0
523522
while (j <= i) {
524523
symmetricValues(i * n + j) = triangularValues(r)
@@ -528,15 +527,15 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
528527
}
529528
i += 1
530529
}
531-
symmetricValues
530+
new DenseMatrix(n, n, symmetricValues)
532531
}
533532

534533
/**
535534
* Update the weight, mean and covariance of gaussian distribution.
536535
*
537536
* @param mean The mean of the gaussian distribution.
538537
* @param cov The covariance matrix of the gaussian distribution. Note we only
539-
* save the upper triangular part as a dense vector.
538+
* save the upper triangular part as a dense vector (column major).
540539
* @param weight The weight of the gaussian distribution.
541540
* @param sumWeights The sum of weights of all clusters.
542541
* @return The updated weight, mean and covariance.
@@ -562,8 +561,8 @@ object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
562561
* @param bcWeights The broadcast weights for each Gaussian distribution in the mixture.
563562
* @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution
564563
* in the mixture. Note only upper triangular part of the covariance
565-
* matrix of each distribution is stored as dense vector in order to
566-
* reduce shuffled data size.
564+
* matrix of each distribution is stored as dense vector (column major)
565+
* in order to reduce shuffled data size.
567566
*/
568567
private class ExpectationAggregator(
569568
numFeatures: Int,
@@ -581,8 +580,7 @@ private class ExpectationAggregator(
581580

582581
@transient private lazy val oldGaussians = {
583582
bcGaussians.value.map { case (mean, covVec) =>
584-
val cov = new DenseMatrix(numFeatures, numFeatures,
585-
GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values))
583+
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
586584
new MultivariateGaussian(mean, cov)
587585
}
588586
}
@@ -611,7 +609,7 @@ private class ExpectationAggregator(
611609
val prob = new Array[Double](k)
612610
var probSum = 0.0
613611
var i = 0
614-
while(i < k) {
612+
while (i < k) {
615613
val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
616614
prob(i) = p
617615
probSum += p
@@ -623,7 +621,7 @@ private class ExpectationAggregator(
623621
val localNewMeans = newMeans
624622
val localNewCovs = newCovs
625623
i = 0
626-
while(i < k) {
624+
while (i < k) {
627625
prob(i) /= probSum
628626
localNewWeights(i) += prob(i)
629627
BLAS.axpy(prob(i), instance, localNewMeans(i))
@@ -654,7 +652,7 @@ private class ExpectationAggregator(
654652
val localThisNewCovs = this.newCovs
655653
val localOtherNewCovs = other.newCovs
656654
var i = 0
657-
while(i < k) {
655+
while (i < k) {
658656
localThisNewWeights(i) += localOtherNewWeights(i)
659657
BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i))
660658
BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i))

mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.spark.ml.clustering
1919

2020
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.ml.linalg.{Matrices, Vector, Vectors}
21+
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
2222
import org.apache.spark.ml.param.ParamMap
23+
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
2324
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
2425
import org.apache.spark.ml.util.TestingUtils._
2526
import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -33,6 +34,7 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
3334
import GaussianMixtureSuite._
3435

3536
final val k = 5
37+
private val seed = 538009335
3638
@transient var dataset: Dataset[_] = _
3739
@transient var denseDataset: Dataset[_] = _
3840
@transient var sparseDataset: Dataset[_] = _
@@ -45,7 +47,7 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
4547
dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k)
4648
denseDataset = denseData.map(FeatureData).toDF()
4749
sparseDataset = denseData.map { point =>
48-
FeatureData(Vectors.sparse(1, Array(0), point.toArray))
50+
FeatureData(point.toSparse)
4951
}.toDF()
5052
decompositionDataset = decompositionData.map(FeatureData).toDF()
5153
rDataset = rData.map(FeatureData).toDF()
@@ -144,40 +146,36 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
144146

145147
test("univariate dense data with two clusters") {
146148
val weights = Array(2.0 / 3.0, 1.0 / 3.0)
147-
val mean = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
148-
val cov = Array(Matrices.dense(1, 1, Array(0.86644)), Matrices.dense(1, 1, Array(1.1098)))
149-
150-
val gmm = new GaussianMixture().setK(2).fit(denseDataset)
149+
val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
150+
val covs = Array(Matrices.dense(1, 1, Array(0.86644)), Matrices.dense(1, 1, Array(1.1098)))
151+
val gaussians = means.zip(covs).map { case (mean, cov) =>
152+
new MultivariateGaussian(mean, cov)
153+
}
151154

152-
assert(gmm.weights(0) ~== weights(0) absTol 1E-3)
153-
assert(gmm.weights(1) ~== weights(1) absTol 1E-3)
154-
assert(gmm.gaussians(0).mean ~== mean(0) absTol 1E-3)
155-
assert(gmm.gaussians(1).mean ~== mean(1) absTol 1E-3)
156-
assert(gmm.gaussians(0).cov ~== cov(0) absTol 1E-3)
157-
assert(gmm.gaussians(1).cov ~== cov(1) absTol 1E-3)
155+
val expected = new GaussianMixtureModel("dummy", weights, gaussians)
156+
val actual = new GaussianMixture().setK(2).setSeed(seed).fit(denseDataset)
157+
modelEquals(expected, actual)
158158
}
159159

160160
test("univariate sparse data with two clusters") {
161161
val weights = Array(2.0 / 3.0, 1.0 / 3.0)
162-
val mean = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
163-
val cov = Array(Matrices.dense(1, 1, Array(0.86644)), Matrices.dense(1, 1, Array(1.1098)))
164-
165-
val gmm = new GaussianMixture().setK(2).fit(sparseDataset)
162+
val means = Array(Vectors.dense(5.1604), Vectors.dense(-4.3673))
163+
val covs = Array(Matrices.dense(1, 1, Array(0.86644)), Matrices.dense(1, 1, Array(1.1098)))
164+
val gaussians = means.zip(covs).map { case (mean, cov) =>
165+
new MultivariateGaussian(mean, cov)
166+
}
166167

167-
assert(gmm.weights(0) ~== weights(0) absTol 1E-3)
168-
assert(gmm.weights(1) ~== weights(1) absTol 1E-3)
169-
assert(gmm.gaussians(0).mean ~== mean(0) absTol 1E-3)
170-
assert(gmm.gaussians(1).mean ~== mean(1) absTol 1E-3)
171-
assert(gmm.gaussians(0).cov ~== cov(0) absTol 1E-3)
172-
assert(gmm.gaussians(1).cov ~== cov(1) absTol 1E-3)
168+
val expected = new GaussianMixtureModel("dummy", weights, gaussians)
169+
val actual = new GaussianMixture().setK(2).setSeed(seed).fit(sparseDataset)
170+
modelEquals(expected, actual)
173171
}
174172

175173
test("check distributed decomposition") {
176174
val k = 5
177175
val d = decompositionData.head.size
178176
assert(GaussianMixture.shouldDistributeGaussians(k, d))
179177

180-
val gmm = new GaussianMixture().setK(k).fit(decompositionDataset)
178+
val gmm = new GaussianMixture().setK(k).setSeed(seed).fit(decompositionDataset)
181179
assert(gmm.getK === k)
182180
}
183181

@@ -213,18 +211,16 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
213211
[2,] 0.1607830 1.008878
214212
*/
215213
val weights = Array(0.5333333, 0.4666667)
216-
val mean = Array(Vectors.dense(10.363673, 9.897081), Vectors.dense(0.11731091, -0.06192351))
217-
val cov = Array(Matrices.dense(2, 2, Array(0.2961543, 0.1607830, 0.160783, 1.008878)),
214+
val means = Array(Vectors.dense(10.363673, 9.897081), Vectors.dense(0.11731091, -0.06192351))
215+
val covs = Array(Matrices.dense(2, 2, Array(0.2961543, 0.1607830, 0.160783, 1.008878)),
218216
Matrices.dense(2, 2, Array(0.62049934, 0.06880802, 0.06880802, 1.27431874)))
217+
val gaussians = means.zip(covs).map { case (mean, cov) =>
218+
new MultivariateGaussian(mean, cov)
219+
}
219220

220-
val gmm = new GaussianMixture().setK(2).fit(rDataset)
221-
222-
assert(gmm.weights(0) ~== weights(0) absTol 1E-3)
223-
assert(gmm.weights(1) ~== weights(1) absTol 1E-3)
224-
assert(gmm.gaussians(0).mean ~== mean(0) absTol 1E-3)
225-
assert(gmm.gaussians(1).mean ~== mean(1) absTol 1E-3)
226-
assert(gmm.gaussians(0).cov ~== cov(0) absTol 1E-3)
227-
assert(gmm.gaussians(1).cov ~== cov(1) absTol 1E-3)
221+
val expected = new GaussianMixtureModel("dummy", weights, gaussians)
222+
val actual = new GaussianMixture().setK(2).setSeed(seed).fit(rDataset)
223+
modelEquals(expected, actual)
228224
}
229225

230226
test("upper triangular matrix unpacking") {
@@ -238,12 +234,13 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
238234
val triangularValues = Array(1.0, 2.5, 2.0, 3.8, 7.2, 3.0, 0.9, 3.8, 1.0, 4.0)
239235
val symmetricValues = Array(1.0, 2.5, 3.8, 0.9, 2.5, 2.0, 7.2, 3.8,
240236
3.8, 7.2, 3.0, 1.0, 0.9, 3.8, 1.0, 4.0)
241-
val expected = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues)
242-
assert(symmetricValues === expected)
237+
val symmetricMatrix = new DenseMatrix(4, 4, symmetricValues)
238+
val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues)
239+
assert(symmetricMatrix === expectedMatrix)
243240
}
244241
}
245242

246-
object GaussianMixtureSuite {
243+
object GaussianMixtureSuite extends SparkFunSuite {
247244
/**
248245
* Mapping from all Params to valid settings which differ from the defaults.
249246
* This is useful for tests which need to exercise all Params, such as save/load.
@@ -281,4 +278,12 @@ object GaussianMixtureSuite {
281278
)
282279

283280
case class FeatureData(features: Vector)
281+
282+
def modelEquals(m1: GaussianMixtureModel, m2: GaussianMixtureModel): Unit = {
283+
assert(m1.weights.length === m2.weights.length)
284+
for (i <- m1.weights.indices) {
285+
assert(m1.gaussians(i).mean ~== m2.gaussians(i).mean absTol 1E-3)
286+
assert(m1.gaussians(i).cov ~== m2.gaussians(i).cov absTol 1E-3)
287+
}
288+
}
284289
}

0 commit comments

Comments
 (0)