From 2ed94de769d2b06e2a81e6cd320637118874047d Mon Sep 17 00:00:00 2001 From: sethah Date: Fri, 20 Jan 2017 09:13:39 -0800 Subject: [PATCH 1/3] numFeatures check --- .../spark/ml/clustering/GaussianMixture.scala | 14 +++++++++++--- .../spark/mllib/clustering/GaussianMixture.scala | 15 ++++++++++++--- .../ml/clustering/GaussianMixtureSuite.scala | 13 +++++++++++++ .../mllib/clustering/GaussianMixtureSuite.scala | 11 +++++++++++ 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index db5fff5af86e..b3ea1db70d3e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -278,7 +278,9 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] { * While this process is generally guaranteed to converge, it is not guaranteed * to find a global optimum. * - * @note For high-dimensional data (with many features), this algorithm may perform poorly. + * @note This algorithm is limited in its number of features since it requires storing a covariance + * matrix which has size quadratic in the number of features. Even when the number of features does + * not exceed this limit, this algorithm may perform poorly on high-dimensional data. * This is due to high-dimensional data (a) making it difficult to cluster at all (based * on statistical/theoretical arguments) and (b) numerical issues with Gaussian distributions. */ @@ -344,6 +346,9 @@ class GaussianMixture @Since("2.0.0") ( // Extract the number of features. val numFeatures = instances.first().size + require(numFeatures < GaussianMixture.MAX_NUM_FEATURES, s"GaussianMixture cannot handle more " + + s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + + s" matrix is quadratic in the number of features.") val instr = Instrumentation.create(this, instances) instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol) @@ -391,8 +396,8 @@ class GaussianMixture @Since("2.0.0") ( val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, cov, weight) => GaussianMixture.updateWeightsAndGaussians(mean, cov, weight, sumWeights) }.collect().unzip - Array.copy(ws.toArray, 0, weights, 0, ws.length) - Array.copy(gs.toArray, 0, gaussians, 0, gs.length) + Array.copy(ws, 0, weights, 0, ws.length) + Array.copy(gs, 0, gaussians, 0, gs.length) } else { var i = 0 while (i < numClusters) { @@ -486,6 +491,9 @@ class GaussianMixture @Since("2.0.0") ( @Since("2.0.0") object GaussianMixture extends DefaultParamsReadable[GaussianMixture] { + /** Limit number of features such that numFeatures^2^ < Integer.MaxValue */ + private[clustering] val MAX_NUM_FEATURES = 46000 + @Since("2.0.0") override def load(path: String): GaussianMixture = super.load(path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index 10bd8468b35c..9d8b73f0a46d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -46,7 +46,9 @@ import org.apache.spark.util.Utils * is considered to have occurred. * @param maxIterations Maximum number of iterations allowed. * - * @note For high-dimensional data (with many features), this algorithm may perform poorly. + * @note This algorithm is limited in its number of features since it requires storing a covariance + * matrix which has size quadratic in the number of features. Even when the number of features does + * not exceed this limit, this algorithm may perform poorly on high-dimensional data. * This is due to high-dimensional data (a) making it difficult to cluster at all (based * on statistical/theoretical arguments) and (b) numerical issues with Gaussian distributions. */ @@ -170,6 +172,9 @@ class GaussianMixture private ( // Get length of the input vectors val d = breezeData.first().length + require(d < GaussianMixture.MAX_NUM_FEATURES, s"GaussianMixture cannot handle more " + + s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + + s" matrix is quadratic in the number of features.") val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(k, d) @@ -211,8 +216,8 @@ class GaussianMixture private ( val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) => updateWeightsAndGaussians(mean, sigma, weight, sumWeights) }.collect().unzip - Array.copy(ws.toArray, 0, weights, 0, ws.length) - Array.copy(gs.toArray, 0, gaussians, 0, gs.length) + Array.copy(ws, 0, weights, 0, ws.length) + Array.copy(gs, 0, gaussians, 0, gs.length) } else { var i = 0 while (i < k) { @@ -272,6 +277,10 @@ class GaussianMixture private ( } private[clustering] object GaussianMixture { + + /** Limit number of features such that numFeatures^2^ < Integer.MaxValue */ + private[clustering] val MAX_NUM_FEATURES = 46000 + /** * Heuristic to distribute the computation of the `MultivariateGaussian`s, approximately when * d is greater than 25 except for when k is very small. diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index e54eb2750c38..a0f958721bc0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -53,6 +53,19 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext rDataset = rData.map(FeatureData).toDF() } + test("gmm fails on high dimensional data") { + val ctx = spark.sqlContext + import ctx.implicits._ + val df = Seq( + Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)), + Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0))) + .map(Tuple1.apply).toDF("features") + val gm = new GaussianMixture() + intercept[IllegalArgumentException] { + gm.fit(df) + } + } + test("default parameters") { val gm = new GaussianMixture() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala index 67e680be7330..7deaede37538 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala @@ -25,6 +25,17 @@ import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext { + + test("gmm fails on high dimensional data") { + val rdd = sc.parallelize(Seq( + Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)), + Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0)))) + val gm = new GaussianMixture() + intercept[IllegalArgumentException] { + gm.run(rdd) + } + } + test("single cluster") { val data = sc.parallelize(Array( Vectors.dense(6.0, 9.0), From 51a237b001a2e9a9257346b061fbf25bd63dc820 Mon Sep 17 00:00:00 2001 From: sethah Date: Mon, 23 Jan 2017 14:10:41 -0800 Subject: [PATCH 2/3] address review --- .../spark/ml/clustering/GaussianMixtureSuite.scala | 9 +++++---- .../spark/mllib/clustering/GaussianMixtureSuite.scala | 7 +++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index a0f958721bc0..c500c5b3e365 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -54,15 +54,16 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext } test("gmm fails on high dimensional data") { - val ctx = spark.sqlContext - import ctx.implicits._ val df = Seq( Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)), Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0))) .map(Tuple1.apply).toDF("features") val gm = new GaussianMixture() - intercept[IllegalArgumentException] { - gm.fit(df) + withClue(s"GMM should restrict the maximum number of features to be < " + + s"${GaussianMixture.MAX_NUM_FEATURES}") { + intercept[IllegalArgumentException] { + gm.fit(df) + } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala index 7deaede37538..11189d8bd477 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/GaussianMixtureSuite.scala @@ -31,8 +31,11 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext { Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(0, 4), Array(3.0, 8.0)), Vectors.sparse(GaussianMixture.MAX_NUM_FEATURES + 1, Array(1, 5), Array(4.0, 9.0)))) val gm = new GaussianMixture() - intercept[IllegalArgumentException] { - gm.run(rdd) + withClue(s"GMM should restrict the maximum number of features to be < " + + s"${GaussianMixture.MAX_NUM_FEATURES}") { + intercept[IllegalArgumentException] { + gm.run(rdd) + } } } From 5672d1345f661665f521fd1dd4410313ef3ab554 Mon Sep 17 00:00:00 2001 From: sethah Date: Tue, 24 Jan 2017 07:59:33 -0800 Subject: [PATCH 3/3] exact threshold --- .../org/apache/spark/ml/clustering/GaussianMixture.scala | 4 ++-- .../org/apache/spark/mllib/clustering/GaussianMixture.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index b3ea1db70d3e..ea2dc6cfd8d3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -491,8 +491,8 @@ class GaussianMixture @Since("2.0.0") ( @Since("2.0.0") object GaussianMixture extends DefaultParamsReadable[GaussianMixture] { - /** Limit number of features such that numFeatures^2^ < Integer.MaxValue */ - private[clustering] val MAX_NUM_FEATURES = 46000 + /** Limit number of features such that numFeatures^2^ < Int.MaxValue */ + private[clustering] val MAX_NUM_FEATURES = math.sqrt(Int.MaxValue).toInt @Since("2.0.0") override def load(path: String): GaussianMixture = super.load(path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index 9d8b73f0a46d..051ec2404fb6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -278,8 +278,8 @@ class GaussianMixture private ( private[clustering] object GaussianMixture { - /** Limit number of features such that numFeatures^2^ < Integer.MaxValue */ - private[clustering] val MAX_NUM_FEATURES = 46000 + /** Limit number of features such that numFeatures^2^ < Int.MaxValue */ + private[clustering] val MAX_NUM_FEATURES = math.sqrt(Int.MaxValue).toInt /** * Heuristic to distribute the computation of the `MultivariateGaussian`s, approximately when