diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 3d3e7a22e594..368f177cda82 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -271,7 +271,7 @@ private[spark] object BLAS extends Serializable { } /** - * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR. + * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's ?SPR. * * @param U the upper triangular part of the matrix packed in an array (column major) */ diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala index 42746b572702..a08b8af0fcbf 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala @@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") ( */ @transient private lazy val tuple = { val (rootSigmaInv, u) = calculateCovarianceConstants - val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv) + val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean) (rootSigmaInvMat, u, rootSigmaInvMulMu) } @@ -81,6 +81,36 @@ class MultivariateGaussian @Since("2.0.0") ( u - 0.5 * BLAS.dot(v, v) } + private[ml] def pdf(X: Matrix): DenseVector = { + val mat = DenseMatrix.zeros(X.numRows, X.numCols) + pdf(X, mat) + } + + private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = { + require(!mat.isTransposed) + + BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat) + val m = mat.numRows + val n = mat.numCols + + val pdfVec = mat.multiply(rootSigmaInvMulMu) + + val blas = BLAS.getBLAS(n) + val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1, rootSigmaInvMulMu.values, 1) + + val localU = u + var i = 0 + while (i < m) { + val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m) + val dot = pdfVec(i) + val squaredSum = squared1 + squared2 - dot - dot + pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum) + i += 1 + } + + pdfVec + } + /** * Calculate distribution dependent components used for the density function: * pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu)) diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala index f2ecff1cc58b..8652d317a85c 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala @@ -27,6 +27,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { test("univariate") { val x1 = Vectors.dense(0.0) val x2 = Vectors.dense(1.5) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0) val sigma1 = Matrices.dense(1, 1, Array(1.0)) @@ -35,6 +36,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5) assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5) assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5) + assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5) val sigma2 = Matrices.dense(1, 1, Array(4.0)) val dist2 = new MultivariateGaussian(mu, sigma2) @@ -42,11 +44,13 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5) assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5) assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5) + assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5) } test("multivariate") { val x1 = Vectors.dense(0.0, 0.0) val x2 = Vectors.dense(1.0, 1.0) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0, 0.0) val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0)) @@ -55,6 +59,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5) assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5) assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5) + assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5) val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0)) val dist2 = new MultivariateGaussian(mu, sigma2) @@ -62,21 +67,25 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5) assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5) assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5) + assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5) } test("multivariate degenerate") { val x1 = Vectors.dense(0.0, 0.0) val x2 = Vectors.dense(1.0, 1.0) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0, 0.0) val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0)) val dist = new MultivariateGaussian(mu, sigma) assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5) assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5) + assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5) } test("SPARK-11302") { val x = Vectors.dense(629, 640, 1.7188, 618.19) + val mat = Matrices.fromVectors(Seq(x)) val mu = Vectors.dense( 1055.3910505836575, 1070.489299610895, 1.39020554474708, 1040.5907503867697) val sigma = Matrices.dense(4, 4, Array( @@ -87,5 +96,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { val dist = new MultivariateGaussian(mu, sigma) // Agrees with R's dmvnorm: 7.154782e-05 assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9) + assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 1c4560aa5fdd..6d4137b638dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel */ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter with HasFeaturesCol with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol with HasTol - with HasAggregationDepth { + with HasAggregationDepth with HasBlockSize { /** * Number of independent Gaussians in the mixture model. Must be greater than 1. Default: 2. @@ -279,8 +279,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] { * @param weights Weights for each Gaussian * @return Probability (partial assignment) for each of the k clusters */ - private[clustering] - def computeProbabilities( + private[clustering] def computeProbabilities( features: Vector, dists: Array[MultivariateGaussian], weights: Array[Double]): Array[Double] = { @@ -375,6 +374,25 @@ class GaussianMixture @Since("2.0.0") ( @Since("3.0.0") def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + /** * Number of samples per cluster to use when initializing Gaussians. */ @@ -392,7 +410,11 @@ class GaussianMixture @Since("2.0.0") ( s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + s" matrix is quadratic in the number of features.") - val handlePersistence = dataset.storageLevel == StorageLevel.NONE + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter, + seed, tol, aggregationDepth, blockSize) + instr.logNumFeatures(numFeatures) val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { col($(weightCol)).cast(DoubleType) @@ -401,25 +423,50 @@ class GaussianMixture @Since("2.0.0") ( } val instances = dataset.select(DatasetUtils.columnToVector(dataset, $(featuresCol)), w) - .as[(Vector, Double)] - .rdd + .as[(Vector, Double)].rdd + .setName("training instances") - if (handlePersistence) { + if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) { instances.persist(StorageLevel.MEMORY_AND_DISK) } - val sc = spark.sparkContext - val numClusters = $(k) + // TODO: SPARK-15785 Support users supplied initial GMM. + val (weights, gaussians) = initRandom(instances, $(k), numFeatures) - instr.logPipelineStage(this) - instr.logDataset(dataset) - instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter, - seed, tol, aggregationDepth) - instr.logNumFeatures(numFeatures) + val (logLikelihood, iteration) = if ($(blockSize) == 1) { + trainOnRows(instances, weights, gaussians, numFeatures, instr) + } else { + val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble / v.size }.mean() + instr.logNamedValue("sparsity", sparsity.toString) + if (sparsity > 0.5) { + logWarning(s"sparsity of input dataset is $sparsity, " + + s"which may hurt performance in high-level BLAS.") + } + trainOnBlocks(instances, weights, gaussians, numFeatures, instr) + } + if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() - // TODO: SPARK-15785 Support users supplied initial GMM. - val (weights, gaussians) = initRandom(instances, numClusters, numFeatures) + val gaussianDists = gaussians.map { case (mean, covVec) => + val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) + new MultivariateGaussian(mean, cov) + } + + val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)) + .setParent(this) + val summary = new GaussianMixtureSummary(model.transform(dataset), + $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration) + instr.logNamedValue("logLikelihood", logLikelihood) + instr.logNamedValue("clusterSizes", summary.clusterSizes) + model.setSummary(Some(summary)) + } + private def trainOnRows( + instances: RDD[(Vector, Double)], + weights: Array[Double], + gaussians: Array[(DenseVector, DenseVector)], + numFeatures: Int, + instr: Instrumentation): (Double, Int) = { + val sc = instances.sparkContext var logLikelihood = Double.MinValue var logLikelihoodPrev = 0.0 @@ -440,7 +487,7 @@ class GaussianMixture @Since("2.0.0") ( val ws = agg.weights.sum if (iteration == 0) weightSumAccum.add(ws) logLikelihoodAccum.add(agg.logLikelihood) - Iterator.tabulate(numClusters) { i => + Iterator.tabulate(bcWeights.value.length) { i => (i, (agg.means(i), agg.covs(i), agg.weights(i), ws)) } } else Iterator.empty @@ -471,21 +518,77 @@ class GaussianMixture @Since("2.0.0") ( instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood) iteration += 1 } - if (handlePersistence) { - instances.unpersist() - } - val gaussianDists = gaussians.map { case (mean, covVec) => - val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) - new MultivariateGaussian(mean, cov) + (logLikelihood, iteration) + } + + private def trainOnBlocks( + instances: RDD[(Vector, Double)], + weights: Array[Double], + gaussians: Array[(DenseVector, DenseVector)], + numFeatures: Int, + instr: Instrumentation): (Double, Int) = { + val blocks = instances.mapPartitions { iter => + iter.grouped($(blockSize)) + .map { seq => (Matrices.fromVectors(seq.map(_._1)), seq.map(_._2).toArray) } + }.persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val sc = instances.sparkContext + var logLikelihood = Double.MinValue + var logLikelihoodPrev = 0.0 + + var iteration = 0 + while (iteration < $(maxIter) && math.abs(logLikelihood - logLikelihoodPrev) > $(tol)) { + val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null + val logLikelihoodAccum = sc.doubleAccumulator + val bcWeights = sc.broadcast(weights) + val bcGaussians = sc.broadcast(gaussians) + + // aggregate the cluster contribution for all sample points, + // and then compute the new distributions + blocks.mapPartitions { iter => + if (iter.nonEmpty) { + val agg = new BlockExpectationAggregator(numFeatures, + $(blockSize), bcWeights, bcGaussians) + while (iter.hasNext) { agg.add(iter.next) } + // sum of weights in this partition + val ws = agg.weights.sum + if (iteration == 0) weightSumAccum.add(ws) + logLikelihoodAccum.add(agg.logLikelihood) + agg.meanIter.zip(agg.covIter).zipWithIndex + .map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) } + } else Iterator.empty + }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) => + // update the weights, means and covariances for i-th distributions + BLAS.axpy(1.0, mean2, mean1) + BLAS.axpy(1.0, cov2, cov1) + (mean1, cov1, w1 + w2, ws1 + ws2) + }.mapValues { case (mean, cov, w, ws) => + // Create new distributions based on the partial assignments + // (often referred to as the "M" step in literature) + GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws) + }.collect().foreach { case (i, (weight, gaussian)) => + weights(i) = weight + gaussians(i) = gaussian + } + + bcWeights.destroy() + bcGaussians.destroy() + + if (iteration == 0) { + instr.logNumExamples(weightSumAccum.count) + instr.logSumOfWeights(weightSumAccum.value) + } + + logLikelihoodPrev = logLikelihood // current becomes previous + logLikelihood = logLikelihoodAccum.value // this is the freshly computed log-likelihood + instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood) + iteration += 1 } + blocks.unpersist() - val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this) - val summary = new GaussianMixtureSummary(model.transform(dataset), - $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration) - instr.logNamedValue("logLikelihood", logLikelihood) - instr.logNamedValue("clusterSizes", summary.clusterSizes) - model.setSummary(Some(summary)) + (logLikelihood, iteration) } @Since("2.0.0") @@ -626,16 +729,15 @@ private class ExpectationAggregator( bcWeights: Broadcast[Array[Double]], bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable { - private val k: Int = bcWeights.value.length - private var totalCnt: Long = 0L - private var newLogLikelihood: Double = 0.0 - private lazy val newWeights: Array[Double] = Array.ofDim[Double](k) - private lazy val newMeans: Array[DenseVector] = Array.fill(k)( - new DenseVector(Array.ofDim[Double](numFeatures))) - private lazy val newCovs: Array[DenseVector] = Array.fill(k)( - new DenseVector(Array.ofDim[Double](numFeatures * (numFeatures + 1) / 2))) + private val k = bcWeights.value.length + private var totalCnt = 0L + private var newLogLikelihood = 0.0 + private val covSize = numFeatures * (numFeatures + 1) / 2 + private lazy val newWeights = Array.ofDim[Double](k) + @transient private lazy val newMeans = Array.fill(k)(Vectors.zeros(numFeatures).toDense) + @transient private lazy val newCovs = Array.fill(k)(Vectors.zeros(covSize).toDense) - @transient private lazy val oldGaussians = { + @transient private lazy val gaussians = { bcGaussians.value.map { case (mean, covVec) => val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) new MultivariateGaussian(mean, cov) @@ -656,19 +758,19 @@ private class ExpectationAggregator( * Add a new training instance to this ExpectationAggregator, update the weights, * means and covariances for each distributions, and update the log likelihood. * - * @param weightedVector The instance of data point to be added. + * @param instance The instance of data point to be added. * @return This ExpectationAggregator object. */ - def add(weightedVector: (Vector, Double)): this.type = { - val (instance: Vector, weight: Double) = weightedVector + def add(instance: (Vector, Double)): this.type = { + val (vector: Vector, weight: Double) = instance val localWeights = bcWeights.value - val localOldGaussians = oldGaussians + val localGaussians = gaussians val prob = new Array[Double](k) var probSum = 0.0 var i = 0 while (i < k) { - val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance) + val p = EPSILON + localWeights(i) * localGaussians(i).pdf(vector) prob(i) = p probSum += p i += 1 @@ -682,42 +784,128 @@ private class ExpectationAggregator( while (i < k) { val w = prob(i) / probSum * weight localNewWeights(i) += w - BLAS.axpy(w, instance, localNewMeans(i)) - BLAS.spr(w, instance, localNewCovs(i)) + BLAS.axpy(w, vector, localNewMeans(i)) + BLAS.spr(w, vector, localNewCovs(i)) i += 1 } totalCnt += 1 this } +} + + +/** + * BlockExpectationAggregator computes the partial expectation results. + * + * @param numFeatures The number of features. + * @param bcWeights The broadcast weights for each Gaussian distribution in the mixture. + * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution + * in the mixture. Note only upper triangular part of the covariance + * matrix of each distribution is stored as dense vector (column major) + * in order to reduce shuffled data size. + */ +private class BlockExpectationAggregator( + numFeatures: Int, + blockSize: Int, + bcWeights: Broadcast[Array[Double]], + bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable { + + private val k = bcWeights.value.length + private var totalCnt = 0L + private var newLogLikelihood = 0.0 + private val covSize = numFeatures * (numFeatures + 1) / 2 + private lazy val newWeights = Array.ofDim[Double](k) + @transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k) + @transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k) + @transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, k) + @transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, numFeatures) + @transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense + + @transient private lazy val gaussians = { + bcGaussians.value.map { case (mean, covVec) => + val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) + new MultivariateGaussian(mean, cov) + } + } + + def count: Long = totalCnt + + def logLikelihood: Double = newLogLikelihood + + def weights: Array[Double] = newWeights + + def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense) + + def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense) /** - * Merge another ExpectationAggregator, update the weights, means and covariances - * for each distributions, and update the log likelihood. - * (Note that it's in place merging; as a result, `this` object will be modified.) + * Add a new training instance block to this BlockExpectationAggregator, update the weights, + * means and covariances for each distributions, and update the log likelihood. * - * @param other The other ExpectationAggregator to be merged. - * @return This ExpectationAggregator object. + * @param block The instance block of data point to be added. + * @return This BlockExpectationAggregator object. */ - def merge(other: ExpectationAggregator): this.type = { - if (other.count != 0) { - totalCnt += other.totalCnt - - val localThisNewWeights = this.newWeights - val localOtherNewWeights = other.newWeights - val localThisNewMeans = this.newMeans - val localOtherNewMeans = other.newMeans - val localThisNewCovs = this.newCovs - val localOtherNewCovs = other.newCovs - var i = 0 - while (i < k) { - localThisNewWeights(i) += localOtherNewWeights(i) - BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i)) - BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i)) - i += 1 - } - newLogLikelihood += other.newLogLikelihood + def add(block: (Matrix, Array[Double])): this.type = { + val (matrix: Matrix, weights: Array[Double]) = block + require(matrix.isTransposed) + val size = matrix.numRows + require(weights.length == size) + + val blas1 = BLAS.getBLAS(size) + val blas2 = BLAS.getBLAS(k) + + val probMat = if (blockSize == size) auxiliaryProbMat else DenseMatrix.zeros(size, k) + require(!probMat.isTransposed) + java.util.Arrays.fill(probMat.values, EPSILON) + + val pdfMat = if (blockSize == size) auxiliaryPDFMat else DenseMatrix.zeros(size, numFeatures) + var j = 0 + while (j < k) { + val pdfVec = gaussians(j).pdf(matrix, pdfMat) + blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, probMat.values, j * size, 1) + j += 1 + } + + var i = 0 + while (i < size) { + val weight = weights(i) + val probSum = blas2.dasum(k, probMat.values, i, size) + blas2.dscal(k, weight / probSum, probMat.values, i, size) + blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1) + newLogLikelihood += math.log(probSum) * weight + i += 1 + } + + BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat) + + // compute the cov vector for each row vector + val covVec = auxiliaryCovVec + val covVecIter = matrix match { + case dm: DenseMatrix => + Iterator.tabulate(size) { i => + java.util.Arrays.fill(covVec.values, 0.0) + // when input block is dense, directly use nativeBLAS to avoid array copy + BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i * numFeatures, 1, + covVec.values, 0) + covVec + } + + case sm: SparseMatrix => + sm.rowIter.map { vec => + java.util.Arrays.fill(covVec.values, 0.0) + BLAS.spr(1.0, vec, covVec) + covVec + } } + + covVecIter.zipWithIndex.foreach { case (covVec, i) => + BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1, + probMat.values, i, size, newCovsMat.values, 0, covSize) + } + + totalCnt += size + this } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index b35f964c959b..d848d5a5ee45 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -285,6 +285,17 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest { testClusteringModelSingleProbabilisticPrediction(model, model.predictProbability, dataset, model.getFeaturesCol, model.getProbabilityCol) } + + test("GMM on blocks") { + Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset => + val gmm = new GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed) + val model = gmm.fit(dataset) + Seq(2, 4, 8, 16, 32).foreach { blockSize => + val model2 = gmm.setBlockSize(blockSize).fit(dataset) + modelEquals(model, model2) + } + } + } } object GaussianMixtureSuite extends SparkFunSuite { diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 984ca411167d..8f0e9aad01b5 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -98,7 +98,8 @@ def numIter(self): @inherit_doc class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol, - HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol): + HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol, + HasBlockSize): """ Params for :py:class:`GaussianMixture` and :py:class:`GaussianMixtureModel`. @@ -243,6 +244,8 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav >>> gm.getMaxIter() 30 >>> model = gm.fit(df) + >>> model.getBlockSize() + 1 >>> model.getAggregationDepth() 2 >>> model.getFeaturesCol() @@ -327,16 +330,16 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav @keyword_only def __init__(self, featuresCol="features", predictionCol="prediction", k=2, probabilityCol="probability", tol=0.01, maxIter=100, seed=None, - aggregationDepth=2, weightCol=None): + aggregationDepth=2, weightCol=None, blockSize=1): """ __init__(self, featuresCol="features", predictionCol="prediction", k=2, \ probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \ - aggregationDepth=2, weightCol=None) + aggregationDepth=2, weightCol=None, blockSize=1) """ super(GaussianMixture, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture", self.uid) - self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2) + self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2, blockSize=1) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -347,11 +350,11 @@ def _create_model(self, java_model): @since("2.0.0") def setParams(self, featuresCol="features", predictionCol="prediction", k=2, probabilityCol="probability", tol=0.01, maxIter=100, seed=None, - aggregationDepth=2, weightCol=None): + aggregationDepth=2, weightCol=None, blockSize=1): """ setParams(self, featuresCol="features", predictionCol="prediction", k=2, \ probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \ - aggregationDepth=2, weightCol=None) + aggregationDepth=2, weightCol=None, blockSize=1) Sets params for GaussianMixture. """ @@ -421,6 +424,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.1.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class GaussianMixtureSummary(ClusteringSummary): """