Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] {
* While this process is generally guaranteed to converge, it is not guaranteed
* to find a global optimum.
*
* @note For high-dimensional data (with many features), this algorithm may perform poorly.
* @note This algorithm is limited in its number of features since it requires storing a covariance
* matrix which has size quadratic in the number of features. Even when the number of features does
* not exceed this limit, this algorithm may perform poorly on high-dimensional data.
* This is due to high-dimensional data (a) making it difficult to cluster at all (based
* on statistical/theoretical arguments) and (b) numerical issues with Gaussian distributions.
*/
Expand Down Expand Up @@ -344,6 +346,9 @@ class GaussianMixture @Since("2.0.0") (

// Extract the number of features.
val numFeatures = instances.first().size
require(numFeatures < GaussianMixture.MAX_NUM_FEATURES, s"GaussianMixture cannot handle more " +
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
s" matrix is quadratic in the number of features.")

val instr = Instrumentation.create(this, instances)
instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
Expand Down Expand Up @@ -391,8 +396,8 @@ class GaussianMixture @Since("2.0.0") (
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, cov, weight) =>
GaussianMixture.updateWeightsAndGaussians(mean, cov, weight, sumWeights)
}.collect().unzip
Array.copy(ws.toArray, 0, weights, 0, ws.length)
Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
Array.copy(ws, 0, weights, 0, ws.length)
Array.copy(gs, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < numClusters) {
Expand Down Expand Up @@ -486,6 +491,9 @@ class GaussianMixture @Since("2.0.0") (
@Since("2.0.0")
object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {

/** Limit number of features such that numFeatures^2^ < Int.MaxValue */
private[clustering] val MAX_NUM_FEATURES = math.sqrt(Int.MaxValue).toInt

@Since("2.0.0")
override def load(path: String): GaussianMixture = super.load(path)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import org.apache.spark.util.Utils
* is considered to have occurred.
* @param maxIterations Maximum number of iterations allowed.
*
* @note For high-dimensional data (with many features), this algorithm may perform poorly.
* @note This algorithm is limited in its number of features since it requires storing a covariance
* matrix which has size quadratic in the number of features. Even when the number of features does
* not exceed this limit, this algorithm may perform poorly on high-dimensional data.
* This is due to high-dimensional data (a) making it difficult to cluster at all (based
* on statistical/theoretical arguments) and (b) numerical issues with Gaussian distributions.
*/
Expand Down Expand Up @@ -170,6 +172,9 @@ class GaussianMixture private (

// Get length of the input vectors
val d = breezeData.first().length
require(d < GaussianMixture.MAX_NUM_FEATURES, s"GaussianMixture cannot handle more " +
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
s" matrix is quadratic in the number of features.")

val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(k, d)

Expand Down Expand Up @@ -211,8 +216,8 @@ class GaussianMixture private (
val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) =>
updateWeightsAndGaussians(mean, sigma, weight, sumWeights)
}.collect().unzip
Array.copy(ws.toArray, 0, weights, 0, ws.length)
Array.copy(gs.toArray, 0, gaussians, 0, gs.length)
Array.copy(ws, 0, weights, 0, ws.length)
Array.copy(gs, 0, gaussians, 0, gs.length)
} else {
var i = 0
while (i < k) {
Expand Down Expand Up @@ -272,6 +277,10 @@ class GaussianMixture private (
}

private[clustering] object GaussianMixture {

/** Limit number of features such that numFeatures^2^ < Int.MaxValue */
private[clustering] val MAX_NUM_FEATURES = math.sqrt(Int.MaxValue).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number is not equal to that used in computeCovariance() in mllib.linalg.distributed.RowMatrix.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L327
Do the limits in mllib.linalg.distributed.RowMatrix need to be updated to this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the limiting factor here is that we can't have an array of elements somewhere that has more than 2^31 - 1 elements. For a dense representation of a normal n x n matrix, that limits n to 46340. Here, however, the matrix is a symmetric Gramian matrix that needs n(n+1)/2 elements of storage, so 65535 works.


/**
* Heuristic to distribute the computation of the `MultivariateGaussian`s, approximately when
* d is greater than 25 except for when k is very small.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
rDataset = rData.map(FeatureData).toDF()
}

test("gmm fails on high dimensional data") {
val df = Seq(
Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)),
Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0)))
.map(Tuple1.apply).toDF("features")
val gm = new GaussianMixture()
withClue(s"GMM should restrict the maximum number of features to be < " +
s"${GaussianMixture.MAX_NUM_FEATURES}") {
intercept[IllegalArgumentException] {
gm.fit(df)
}
}
}

test("default parameters") {
val gm = new GaussianMixture()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils

class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext {

test("gmm fails on high dimensional data") {
val rdd = sc.parallelize(Seq(
Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)),
Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0))))
val gm = new GaussianMixture()
withClue(s"GMM should restrict the maximum number of features to be < " +
s"${GaussianMixture.MAX_NUM_FEATURES}") {
intercept[IllegalArgumentException] {
gm.run(rdd)
}
}
}

test("single cluster") {
val data = sc.parallelize(Array(
Vectors.dense(6.0, 9.0),
Expand Down