Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.IndexedSeq

import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix, diag, Transpose}
import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors, DenseVector, DenseMatrix, BLAS}
import org.apache.spark.mllib.stat.impl.MultivariateGaussian
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -134,7 +135,7 @@ class GaussianMixtureEM private (
// derived from the samples
val (weights, gaussians) = initialModel match {
case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map { case(mu, sigma) =>
new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix)
new MultivariateGaussian(mu, sigma)
})

case None => {
Expand Down Expand Up @@ -176,8 +177,8 @@ class GaussianMixtureEM private (
}

// Need to convert the breeze matrices to MLlib matrices
val means = Array.tabulate(k) { i => Vectors.fromBreeze(gaussians(i).mu) }
val sigmas = Array.tabulate(k) { i => Matrices.fromBreeze(gaussians(i).sigma) }
val means = Array.tabulate(k) { i => gaussians(i).mu }
val sigmas = Array.tabulate(k) { i => gaussians(i).sigma }
new GaussianMixtureModel(weights, means, sigmas)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import breeze.linalg.{DenseVector => BreezeVector}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.stat.impl.MultivariateGaussian
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.util.MLUtils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.mllib.stat.impl
package org.apache.spark.mllib.stat.distribution

import breeze.linalg.{DenseVector => DBV, DenseMatrix => DBM, diag, max, eigSym}

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.mllib.linalg.{Vectors, Vector, Matrices, Matrix}
import org.apache.spark.mllib.util.MLUtils

/**
* :: DeveloperApi ::
* This class provides basic functionality for a Multivariate Gaussian (Normal) Distribution. In
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add :: DeveloperApi :: before This class .... We need it to generate the doc correctly. You can check other @DeveloperApi usage as examples.

* the event that the covariance matrix is singular, the density will be computed in a
* reduced dimensional subspace under which the distribution is supported.
Expand All @@ -30,33 +33,64 @@ import org.apache.spark.mllib.util.MLUtils
* @param mu The mean vector of the distribution
* @param sigma The covariance matrix of the distribution
*/
private[mllib] class MultivariateGaussian(
val mu: DBV[Double],
val sigma: DBM[Double]) extends Serializable {
@DeveloperApi
class MultivariateGaussian (
val mu: Vector,
val sigma: Matrix) extends Serializable {

require(sigma.numCols == sigma.numRows, "Covariance matrix must be square")
require(mu.size == sigma.numCols, "Mean vector length must match covariance matrix size")

private val breezeMu = mu.toBreeze.toDenseVector

/**
* private[mllib] constructor
*
* @param mu The mean vector of the distribution
* @param sigma The covariance matrix of the distribution
*/
private[mllib] def this(mu: DBV[Double], sigma: DBM[Double]) = {
this(Vectors.fromBreeze(mu), Matrices.fromBreeze(sigma))
}

/**
* Compute distribution dependent constants:
* rootSigmaInv = D^(-1/2) * U, where sigma = U * D * U.t
* u = (2*pi)^(-k/2) * det(sigma)^(-1/2)
* rootSigmaInv = D^(-1/2)^ * U, where sigma = U * D * U.t
* u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^)
*/
private val (rootSigmaInv: DBM[Double], u: Double) = calculateCovarianceConstants

/** Returns density of this multivariate Gaussian at given point, x */
def pdf(x: DBV[Double]): Double = {
val delta = x - mu
def pdf(x: Vector): Double = {
pdf(x.toBreeze.toDenseVector)
}

/** Returns the log-density of this multivariate Gaussian at given point, x */
def logpdf(x: Vector): Double = {
logpdf(x.toBreeze.toDenseVector)
}

/** Returns density of this multivariate Gaussian at given point, x */
private[mllib] def pdf(x: DBV[Double]): Double = {
math.exp(logpdf(x))
}

/** Returns the log-density of this multivariate Gaussian at given point, x */
private[mllib] def logpdf(x: DBV[Double]): Double = {
val delta = x - breezeMu
val v = rootSigmaInv * delta
u * math.exp(v.t * v * -0.5)
u + v.t * v * -0.5
}

/**
* Calculate distribution dependent components used for the density function:
* pdf(x) = (2*pi)^(-k/2) * det(sigma)^(-1/2) * exp( (-1/2) * (x-mu).t * inv(sigma) * (x-mu) )
* pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu))
* where k is length of the mean vector.
*
* We here compute distribution-fixed parts
* (2*pi)^(-k/2) * det(sigma)^(-1/2)
* log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^)
* and
* D^(-1/2) * U, where sigma = U * D * U.t
* D^(-1/2)^ * U, where sigma = U * D * U.t
*
* Both the determinant and the inverse can be computed from the singular value decomposition
* of sigma. Noting that covariance matrices are always symmetric and positive semi-definite,
Expand All @@ -65,33 +99,33 @@ private[mllib] class MultivariateGaussian(
*
* sigma = U * D * U.t
* inv(Sigma) = U * inv(D) * U.t
* = (D^{-1/2} * U).t * (D^{-1/2} * U)
* = (D^{-1/2}^ * U).t * (D^{-1/2}^ * U)
*
* and thus
*
* -0.5 * (x-mu).t * inv(Sigma) * (x-mu) = -0.5 * norm(D^{-1/2} * U * (x-mu))^2
* -0.5 * (x-mu).t * inv(Sigma) * (x-mu) = -0.5 * norm(D^{-1/2}^ * U * (x-mu))^2^
*
* To guard against singular covariance matrices, this method computes both the
* pseudo-determinant and the pseudo-inverse (Moore-Penrose). Singular values are considered
* to be non-zero only if they exceed a tolerance based on machine precision, matrix size, and
* relation to the maximum singular value (same tolerance used by, e.g., Octave).
*/
private def calculateCovarianceConstants: (DBM[Double], Double) = {
val eigSym.EigSym(d, u) = eigSym(sigma) // sigma = u * diag(d) * u.t
val eigSym.EigSym(d, u) = eigSym(sigma.toBreeze.toDenseMatrix) // sigma = u * diag(d) * u.t

// For numerical stability, values are considered to be non-zero only if they exceed tol.
// This prevents any inverted value from exceeding (eps * n * max(d))^-1
val tol = MLUtils.EPSILON * max(d) * d.length

try {
// pseudo-determinant is product of all non-zero singular values
val pdetSigma = d.activeValuesIterator.filter(_ > tol).reduce(_ * _)
// log(pseudo-determinant) is sum of the logs of all non-zero singular values
val logPseudoDetSigma = d.activeValuesIterator.filter(_ > tol).map(math.log).sum

// calculate the root-pseudo-inverse of the diagonal matrix of singular values
// by inverting the square root of all non-zero values
val pinvS = diag(new DBV(d.map(v => if (v > tol) math.sqrt(1.0 / v) else 0.0).toArray))

(pinvS * u, math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(pdetSigma, -0.5))
(pinvS * u, -0.5 * (mu.size * math.log(2.0 * math.Pi) + logPseudoDetSigma))
} catch {
case uex: UnsupportedOperationException =>
throw new IllegalArgumentException("Covariance matrix has no non-zero singular values")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,53 @@
* limitations under the License.
*/

package org.apache.spark.mllib.stat.impl
package org.apache.spark.mllib.stat.distribution

import org.scalatest.FunSuite

import breeze.linalg.{ DenseVector => BDV, DenseMatrix => BDM }

import org.apache.spark.mllib.linalg.{ Vectors, Matrices }
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._

class MultivariateGaussianSuite extends FunSuite with MLlibTestSparkContext {
test("univariate") {
val x1 = new BDV(Array(0.0))
val x2 = new BDV(Array(1.5))
val x1 = Vectors.dense(0.0)
val x2 = Vectors.dense(1.5)

val mu = new BDV(Array(0.0))
val sigma1 = new BDM(1, 1, Array(1.0))
val mu = Vectors.dense(0.0)
val sigma1 = Matrices.dense(1, 1, Array(1.0))
val dist1 = new MultivariateGaussian(mu, sigma1)
assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5)

val sigma2 = new BDM(1, 1, Array(4.0))
val sigma2 = Matrices.dense(1, 1, Array(4.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5)
}

test("multivariate") {
val x1 = new BDV(Array(0.0, 0.0))
val x2 = new BDV(Array(1.0, 1.0))
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)

val mu = new BDV(Array(0.0, 0.0))
val sigma1 = new BDM(2, 2, Array(1.0, 0.0, 0.0, 1.0))
val mu = Vectors.dense(0.0, 0.0)
val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0))
val dist1 = new MultivariateGaussian(mu, sigma1)
assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5)
assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5)

val sigma2 = new BDM(2, 2, Array(4.0, -1.0, -1.0, 2.0))
val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0))
val dist2 = new MultivariateGaussian(mu, sigma2)
assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5)
assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5)
}

test("multivariate degenerate") {
val x1 = new BDV(Array(0.0, 0.0))
val x2 = new BDV(Array(1.0, 1.0))
val x1 = Vectors.dense(0.0, 0.0)
val x2 = Vectors.dense(1.0, 1.0)

val mu = new BDV(Array(0.0, 0.0))
val sigma = new BDM(2, 2, Array(1.0, 1.0, 1.0, 1.0))
val mu = Vectors.dense(0.0, 0.0)
val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0))
val dist = new MultivariateGaussian(mu, sigma)
assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5)
assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5)
Expand Down