Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import breeze.linalg.{DenseVector => BDV}
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.impl.Utils.EPSILON
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{GaussianMixture => MLlibGM}
import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
Expand All @@ -45,6 +45,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w

/**
* Number of independent Gaussians in the mixture model. Must be greater than 1. Default: 2.
*
* @group param
*/
@Since("2.0.0")
Expand All @@ -57,6 +58,7 @@ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter w

/**
* Validates and transforms the input schema.
*
* @param schema input schema
* @return output schema
*/
Expand Down Expand Up @@ -238,6 +240,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] {

/**
* Compute the probability (partial assignment) for each cluster for the given data point.
*
* @param features Data point
* @param dists Gaussians for model
* @param weights Weights for each Gaussian
Expand Down Expand Up @@ -323,31 +326,98 @@ class GaussianMixture @Since("2.0.0") (
@Since("2.0.0")
def setSeed(value: Long): this.type = set(seed, value)

/**
* Number of samples per cluster to use when initializing Gaussians.
*/
private val numSamples = 5

@Since("2.0.0")
override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
transformSchema(dataset.schema, logging = true)
val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(point: Vector) => OldVectors.fromML(point)
}

val instr = Instrumentation.create(this, rdd)
val sc = dataset.sparkSession.sparkContext
val numClusters = $(k)

val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map {
case Row(features: Vector) => features
}.cache()

// Extract the number of features.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment adds no value IMO

val numFeatures = instances.first().size

val instr = Instrumentation.create(this, instances)
instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
instr.logNumFeatures(numFeatures)

val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(
numClusters, numFeatures)

// TODO: SPARK-15785 Support users supplied initial GMM.
val (weights, gaussians) = initRandom(instances, numClusters, numFeatures)

var logLikelihood = Double.MinValue
var logLikelihoodPrev = 0.0

var iter = 0
while (iter < $(maxIter) && math.abs(logLikelihood - logLikelihoodPrev) > $(tol)) {

val bcWeights = instances.sparkContext.broadcast(weights)
val bcGaussians = instances.sparkContext.broadcast(gaussians)

// aggregate the cluster contribution for all sample points
val sums = instances.treeAggregate(
new ExpectationAggregator(numFeatures, bcWeights, bcGaussians))(
seqOp = (c, v) => (c, v) match {
case (aggregator, instance) => aggregator.add(instance)
},
combOp = (c1, c2) => (c1, c2) match {
case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
})

bcWeights.destroy(blocking = false)
bcGaussians.destroy(blocking = false)

/*
Create new distributions based on the partial assignments
(often referred to as the "M" step in literature)
*/
val sumWeights = sums.weights.sum

if (shouldDistributeGaussians) {
val numPartitions = math.min(numClusters, 1024)
val tuples = Seq.tabulate(numClusters) { i =>
(sums.means(i), sums.covs(i), sums.weights(i))
}
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, cov, weight) =>
GaussianMixture.updateWeightsAndGaussians(mean, cov, weight, sumWeights)
}.collect().unzip
Array.copy(ws.toArray, 0, weights, 0, ws.length)
Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < numClusters) {
val (weight, gaussian) = GaussianMixture.updateWeightsAndGaussians(
sums.means(i), sums.covs(i), sums.weights(i), sumWeights)
weights(i) = weight
gaussians(i) = gaussian
i += 1
}
}

logLikelihoodPrev = logLikelihood // current becomes previous
logLikelihood = sums.logLikelihood // this is the freshly computed log-likelihood
iter += 1
}

val algo = new MLlibGM()
.setK($(k))
.setMaxIterations($(maxIter))
.setSeed($(seed))
.setConvergenceTol($(tol))
val parentModel = algo.run(rdd)
val gaussians = parentModel.gaussians.map { case g =>
new MultivariateGaussian(g.mu.asML, g.sigma.asML)
val gaussianDists = gaussians.map { case (mean, covVec) =>
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
new MultivariateGaussian(mean, cov)
}
val model = copyValues(new GaussianMixtureModel(uid, parentModel.weights, gaussians))
.setParent(this)

val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
val summary = new GaussianMixtureSummary(model.transform(dataset),
$(predictionCol), $(probabilityCol), $(featuresCol), $(k))
model.setSummary(Some(summary))
instr.logNumFeatures(model.gaussians.head.mean.size)
instr.logSuccess(model)
model
}
Expand All @@ -356,13 +426,242 @@ class GaussianMixture @Since("2.0.0") (
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}

/**
* Initialize weights and corresponding gaussian distributions at random.
*
* We start with uniform weights, a random mean from the data, and diagonal covariance matrices
* using component variances derived from the samples.
*
* @param instances The training instances.
* @param numClusters The number of clusters.
* @param numFeatures The number of features of training instance.
* @return The initialized weights and corresponding gaussian distributions. Note the
* covariance matrix of multivariate gaussian distribution is symmetric and
* we only save the upper triangular part as a dense vector (column major).
*/
private def initRandom(
instances: RDD[Vector],
numClusters: Int,
numFeatures: Int): (Array[Double], Array[(DenseVector, DenseVector)]) = {
val samples = instances.takeSample(withReplacement = true, numClusters * numSamples, $(seed))
val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
val gaussians: Array[(DenseVector, DenseVector)] = Array.tabulate(numClusters) { i =>
val slice = samples.view(i * numSamples, (i + 1) * numSamples)
val mean = {
val v = new DenseVector(new Array[Double](numFeatures))
var i = 0
while (i < numSamples) {
BLAS.axpy(1.0, slice(i), v)
i += 1
}
BLAS.scal(1.0 / numSamples, v)
v
}
/*
Construct matrix where diagonal entries are element-wise
variance of input vectors (computes biased variance).
Since the covariance matrix of multivariate gaussian distribution is symmetric,
only the upper triangular part of the matrix (column major) will be saved as
a dense vector in order to reduce the shuffled data size.
*/
val cov = {
val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
val diagVec = Vectors.fromBreeze(ss)
BLAS.scal(1.0 / numSamples, diagVec)
val covVec = new DenseVector(Array.fill[Double](
numFeatures * (numFeatures + 1) / 2)(0.0))
diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
covVec.values(i + i * (i + 1) / 2) = v
}
covVec
}
(mean, cov)
}
(weights, gaussians)
}
}

@Since("2.0.0")
object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {

@Since("2.0.0")
override def load(path: String): GaussianMixture = super.load(path)

/**
* Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when
* numFeatures > 25 except for when numClusters is very small.
*
* @param numClusters Number of clusters
* @param numFeatures Number of features
*/
private[clustering] def shouldDistributeGaussians(
numClusters: Int,
numFeatures: Int): Boolean = {
((numClusters - 1.0) / numClusters) * numFeatures > 25.0
}

/**
* Convert an n * (n + 1) / 2 dimension array representing the upper triangular part of a matrix
* into an n * n array representing the full symmetric matrix (column major).
*
* @param n The order of the n by n matrix.
* @param triangularValues The upper triangular part of the matrix packed in an array
* (column major).
* @return A dense matrix which represents the symmetric matrix in column major.
*/
private[clustering] def unpackUpperTriangularMatrix(
n: Int,
triangularValues: Array[Double]): DenseMatrix = {
val symmetricValues = new Array[Double](n * n)
var r = 0
var i = 0
while (i < n) {
var j = 0
while (j <= i) {
symmetricValues(i * n + j) = triangularValues(r)
symmetricValues(j * n + i) = triangularValues(r)
r += 1
j += 1
}
i += 1
}
new DenseMatrix(n, n, symmetricValues)
}

/**
* Update the weight, mean and covariance of gaussian distribution.
*
* @param mean The mean of the gaussian distribution.
* @param cov The covariance matrix of the gaussian distribution. Note we only
* save the upper triangular part as a dense vector (column major).
* @param weight The weight of the gaussian distribution.
* @param sumWeights The sum of weights of all clusters.
* @return The updated weight, mean and covariance.
*/
private[clustering] def updateWeightsAndGaussians(
mean: DenseVector,
cov: DenseVector,
weight: Double,
sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
BLAS.scal(1.0 / weight, mean)
BLAS.spr(-weight, mean, cov)
BLAS.scal(1.0 / weight, cov)
val newWeight = weight / sumWeights
val newGaussian = (mean, cov)
(newWeight, newGaussian)
}
}

/**
* ExpectationAggregator computes the partial expectation results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have typically used this documentation as a place to explain the math used to compute updates. It would be nice to have that here as well.

*
* @param numFeatures The number of features.
* @param bcWeights The broadcast weights for each Gaussian distribution in the mixture.
* @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution
* in the mixture. Note only upper triangular part of the covariance
* matrix of each distribution is stored as dense vector (column major)
* in order to reduce shuffled data size.
*/
private class ExpectationAggregator(
numFeatures: Int,
bcWeights: Broadcast[Array[Double]],
bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable {

private val k: Int = bcWeights.value.length
private var totalCnt: Long = 0L
private var newLogLikelihood: Double = 0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using "new" as a prefix here and elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectationAggregator accepts the old weights, gaussians and outputs the new ones, I added prefix to make it more clear.

private val newWeights: Array[Double] = new Array[Double](k)
private val newMeans: Array[DenseVector] = Array.fill(k)(
new DenseVector(Array.fill[Double](numFeatures)(0.0)))
private val newCovs: Array[DenseVector] = Array.fill(k)(
new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0)))

@transient private lazy val oldGaussians = {
bcGaussians.value.map { case (mean, covVec) =>
val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)
new MultivariateGaussian(mean, cov)
}
}

def count: Long = totalCnt

def logLikelihood: Double = newLogLikelihood

def weights: Array[Double] = newWeights

def means: Array[DenseVector] = newMeans

def covs: Array[DenseVector] = newCovs

/**
* Add a new training instance to this ExpectationAggregator, update the weights,
* means and covariances for each distributions, and update the log likelihood.
*
* @param instance The instance of data point to be added.
* @return This ExpectationAggregator object.
*/
def add(instance: Vector): this.type = {
val localWeights = bcWeights.value
val localOldGaussians = oldGaussians

val prob = new Array[Double](k)
var probSum = 0.0
var i = 0
while (i < k) {
val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
prob(i) = p
probSum += p
i += 1
}

newLogLikelihood += math.log(probSum)
val localNewWeights = newWeights
val localNewMeans = newMeans
val localNewCovs = newCovs
i = 0
while (i < k) {
prob(i) /= probSum
localNewWeights(i) += prob(i)
BLAS.axpy(prob(i), instance, localNewMeans(i))
BLAS.spr(prob(i), instance, localNewCovs(i))
i += 1
}

totalCnt += 1
this
}

/**
* Merge another ExpectationAggregator, update the weights, means and covariances
* for each distributions, and update the log likelihood.
* (Note that it's in place merging; as a result, `this` object will be modified.)
*
* @param other The other ExpectationAggregator to be merged.
* @return This ExpectationAggregator object.
*/
def merge(other: ExpectationAggregator): this.type = {
if (other.count != 0) {
totalCnt += other.totalCnt

val localThisNewWeights = this.newWeights
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to make these local copies?

Copy link
Contributor Author

@yanboliang yanboliang Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we are actually accessing a getter method when we call this.newWeights. To improve the performance of the loop at L655, we should use explicit pointers to the values rather than call getter each time. It's probably not a big deal in this case since k is usually not very big, but I don't think it's a bad idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I'm not sure about this either. Does the JIT compiler adjust enough to make it efficient? The way it is seems fine though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it's necessary here because this is inside the merge operations which will be called far less than the add operation. It may be overkill in any event, but I'm also ok leaving it.

val localOtherNewWeights = other.newWeights
val localThisNewMeans = this.newMeans
val localOtherNewMeans = other.newMeans
val localThisNewCovs = this.newCovs
val localOtherNewCovs = other.newCovs
var i = 0
while (i < k) {
localThisNewWeights(i) += localOtherNewWeights(i)
BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i))
BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i))
i += 1
}
newLogLikelihood += other.newLogLikelihood
}
this
}
}

/**
Expand Down
Loading