From 985ad64987115aea95591954e88e216a988bd35d Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 30 Jan 2020 17:41:10 +0800 Subject: [PATCH 1/4] init init init init init init init init init init init use nativeBLAS for dense input add py refactor refactor refactor nit revert BLAS.ger revert BLAS.ger revert BLAS.ger nit nit simplify nit nit opt_blas opt_blas nit nit nit --- .../org/apache/spark/ml/linalg/BLAS.scala | 2 +- .../distribution/MultivariateGaussian.scala | 32 +- .../MultivariateGaussianSuite.scala | 10 + .../spark/ml/clustering/GaussianMixture.scala | 306 ++++++++++++++---- .../ml/clustering/GaussianMixtureSuite.scala | 11 + python/pyspark/ml/clustering.py | 22 +- 6 files changed, 313 insertions(+), 70 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 3d3e7a22e594..368f177cda82 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -271,7 +271,7 @@ private[spark] object BLAS extends Serializable { } /** - * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR. + * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's ?SPR. * * @param U the upper triangular part of the matrix packed in an array (column major) */ diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala index 42746b572702..e8edc9df1495 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala @@ -55,7 +55,7 @@ class MultivariateGaussian @Since("2.0.0") ( */ @transient private lazy val tuple = { val (rootSigmaInv, u) = calculateCovarianceConstants - val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv) + val rootSigmaInvMat = Matrices.fromBreeze(rootSigmaInv).toDense val rootSigmaInvMulMu = rootSigmaInvMat.multiply(mean) (rootSigmaInvMat, u, rootSigmaInvMulMu) } @@ -81,6 +81,36 @@ class MultivariateGaussian @Since("2.0.0") ( u - 0.5 * BLAS.dot(v, v) } + private[ml] def pdf(X: Matrix): DenseVector = { + val mat = DenseMatrix.zeros(X.numRows, X.numCols) + pdf(X, mat) + } + + private[ml] def pdf(X: Matrix, mat: DenseMatrix): DenseVector = { + require(!mat.isTransposed) + + BLAS.gemm(1.0, X, rootSigmaInvMat.transpose, 0.0, mat) + val m = mat.numRows + val n = mat.numCols + + val pdfVec = mat.multiply(rootSigmaInvMulMu) + + val blas = BLAS.getBLAS(n) + val squared1 = BLAS.dot(rootSigmaInvMulMu, rootSigmaInvMulMu) + + val localU = u + var i = 0 + while (i < m) { + val squared2 = blas.ddot(n, mat.values, i, m, mat.values, i, m) + val dot = pdfVec(i) + val squaredSum = squared1 + squared2 - dot - dot + pdfVec.values(i) = math.exp(localU - 0.5 * squaredSum) + i += 1 + } + + pdfVec + } + /** * Calculate distribution dependent components used for the density function: * pdf(x) = (2*pi)^(-k/2)^ * det(sigma)^(-1/2)^ * exp((-1/2) * (x-mu).t * inv(sigma) * (x-mu)) diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala index f2ecff1cc58b..8652d317a85c 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussianSuite.scala @@ -27,6 +27,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { test("univariate") { val x1 = Vectors.dense(0.0) val x2 = Vectors.dense(1.5) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0) val sigma1 = Matrices.dense(1, 1, Array(1.0)) @@ -35,6 +36,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist1.logpdf(x2) ~== -2.0439385332046727 absTol 1E-5) assert(dist1.pdf(x1) ~== 0.39894 absTol 1E-5) assert(dist1.pdf(x2) ~== 0.12952 absTol 1E-5) + assert(dist1.pdf(mat) ~== Vectors.dense(0.39894, 0.12952) absTol 1E-5) val sigma2 = Matrices.dense(1, 1, Array(4.0)) val dist2 = new MultivariateGaussian(mu, sigma2) @@ -42,11 +44,13 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist2.logpdf(x2) ~== -1.893335713764618 absTol 1E-5) assert(dist2.pdf(x1) ~== 0.19947 absTol 1E-5) assert(dist2.pdf(x2) ~== 0.15057 absTol 1E-5) + assert(dist2.pdf(mat) ~== Vectors.dense(0.19947, 0.15057) absTol 1E-5) } test("multivariate") { val x1 = Vectors.dense(0.0, 0.0) val x2 = Vectors.dense(1.0, 1.0) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0, 0.0) val sigma1 = Matrices.dense(2, 2, Array(1.0, 0.0, 0.0, 1.0)) @@ -55,6 +59,7 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist1.logpdf(x2) ~== -2.8378770664093453 absTol 1E-5) assert(dist1.pdf(x1) ~== 0.15915 absTol 1E-5) assert(dist1.pdf(x2) ~== 0.05855 absTol 1E-5) + assert(dist1.pdf(mat) ~== Vectors.dense(0.15915, 0.05855) absTol 1E-5) val sigma2 = Matrices.dense(2, 2, Array(4.0, -1.0, -1.0, 2.0)) val dist2 = new MultivariateGaussian(mu, sigma2) @@ -62,21 +67,25 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { assert(dist2.logpdf(x2) ~== -3.3822607123655732 absTol 1E-5) assert(dist2.pdf(x1) ~== 0.060155 absTol 1E-5) assert(dist2.pdf(x2) ~== 0.033971 absTol 1E-5) + assert(dist2.pdf(mat) ~== Vectors.dense(0.060155, 0.033971) absTol 1E-5) } test("multivariate degenerate") { val x1 = Vectors.dense(0.0, 0.0) val x2 = Vectors.dense(1.0, 1.0) + val mat = Matrices.fromVectors(Seq(x1, x2)) val mu = Vectors.dense(0.0, 0.0) val sigma = Matrices.dense(2, 2, Array(1.0, 1.0, 1.0, 1.0)) val dist = new MultivariateGaussian(mu, sigma) assert(dist.pdf(x1) ~== 0.11254 absTol 1E-5) assert(dist.pdf(x2) ~== 0.068259 absTol 1E-5) + assert(dist.pdf(mat) ~== Vectors.dense(0.11254, 0.068259) absTol 1E-5) } test("SPARK-11302") { val x = Vectors.dense(629, 640, 1.7188, 618.19) + val mat = Matrices.fromVectors(Seq(x)) val mu = Vectors.dense( 1055.3910505836575, 1070.489299610895, 1.39020554474708, 1040.5907503867697) val sigma = Matrices.dense(4, 4, Array( @@ -87,5 +96,6 @@ class MultivariateGaussianSuite extends SparkMLFunSuite { val dist = new MultivariateGaussian(mu, sigma) // Agrees with R's dmvnorm: 7.154782e-05 assert(dist.pdf(x) ~== 7.154782224045512E-5 absTol 1E-9) + assert(dist.pdf(mat) ~== Vectors.dense(7.154782224045512E-5) absTol 1E-5) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 1c4560aa5fdd..b5f1878f8143 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel */ private[clustering] trait GaussianMixtureParams extends Params with HasMaxIter with HasFeaturesCol with HasSeed with HasPredictionCol with HasWeightCol with HasProbabilityCol with HasTol - with HasAggregationDepth { + with HasAggregationDepth with HasBlockSize { /** * Number of independent Gaussians in the mixture model. Must be greater than 1. Default: 2. @@ -279,8 +279,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] { * @param weights Weights for each Gaussian * @return Probability (partial assignment) for each of the k clusters */ - private[clustering] - def computeProbabilities( + private[clustering] def computeProbabilities( features: Vector, dists: Array[MultivariateGaussian], weights: Array[Double]): Array[Double] = { @@ -375,6 +374,25 @@ class GaussianMixture @Since("2.0.0") ( @Since("3.0.0") def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + /** * Number of samples per cluster to use when initializing Gaussians. */ @@ -392,7 +410,11 @@ class GaussianMixture @Since("2.0.0") ( s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" + s" matrix is quadratic in the number of features.") - val handlePersistence = dataset.storageLevel == StorageLevel.NONE + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter, + seed, tol, aggregationDepth, blockSize) + instr.logNumFeatures(numFeatures) val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { col($(weightCol)).cast(DoubleType) @@ -401,25 +423,50 @@ class GaussianMixture @Since("2.0.0") ( } val instances = dataset.select(DatasetUtils.columnToVector(dataset, $(featuresCol)), w) - .as[(Vector, Double)] - .rdd + .as[(Vector, Double)].rdd + .setName("training instances") - if (handlePersistence) { + if ($(blockSize) == 1 && dataset.storageLevel == StorageLevel.NONE) { instances.persist(StorageLevel.MEMORY_AND_DISK) } - val sc = spark.sparkContext - val numClusters = $(k) + // TODO: SPARK-15785 Support users supplied initial GMM. + val (weights, gaussians) = initRandom(instances, $(k), numFeatures) - instr.logPipelineStage(this) - instr.logDataset(dataset) - instr.logParams(this, featuresCol, predictionCol, probabilityCol, weightCol, k, maxIter, - seed, tol, aggregationDepth) - instr.logNumFeatures(numFeatures) + val (logLikelihood, iteration) = if ($(blockSize) == 1) { + trainOnRows(instances, weights, gaussians, numFeatures, instr) + } else { + val sparsity = 1 - instances.map { case (v, _) => v.numNonzeros.toDouble / v.size }.mean() + instr.logNamedValue("sparsity", sparsity.toString) + if (sparsity > 0.5) { + logWarning(s"sparsity of input dataset is $sparsity, " + + s"which may hurt performance in high-level BLAS.") + } + trainOnBlocks(instances, weights, gaussians, numFeatures, instr) + } + if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist() - // TODO: SPARK-15785 Support users supplied initial GMM. - val (weights, gaussians) = initRandom(instances, numClusters, numFeatures) + val gaussianDists = gaussians.map { case (mean, covVec) => + val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) + new MultivariateGaussian(mean, cov) + } + + val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)) + .setParent(this) + val summary = new GaussianMixtureSummary(model.transform(dataset), + $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration) + instr.logNamedValue("logLikelihood", logLikelihood) + instr.logNamedValue("clusterSizes", summary.clusterSizes) + model.setSummary(Some(summary)) + } + private def trainOnRows( + instances: RDD[(Vector, Double)], + weights: Array[Double], + gaussians: Array[(DenseVector, DenseVector)], + numFeatures: Int, + instr: Instrumentation): (Double, Int) = { + val sc = instances.sparkContext var logLikelihood = Double.MinValue var logLikelihoodPrev = 0.0 @@ -440,7 +487,7 @@ class GaussianMixture @Since("2.0.0") ( val ws = agg.weights.sum if (iteration == 0) weightSumAccum.add(ws) logLikelihoodAccum.add(agg.logLikelihood) - Iterator.tabulate(numClusters) { i => + Iterator.tabulate(bcWeights.value.length) { i => (i, (agg.means(i), agg.covs(i), agg.weights(i), ws)) } } else Iterator.empty @@ -471,21 +518,77 @@ class GaussianMixture @Since("2.0.0") ( instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood) iteration += 1 } - if (handlePersistence) { - instances.unpersist() - } - val gaussianDists = gaussians.map { case (mean, covVec) => - val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) - new MultivariateGaussian(mean, cov) + (logLikelihood, iteration) + } + + private def trainOnBlocks( + instances: RDD[(Vector, Double)], + weights: Array[Double], + gaussians: Array[(DenseVector, DenseVector)], + numFeatures: Int, + instr: Instrumentation): (Double, Int) = { + val blocks = instances.mapPartitions { iter => + iter.grouped($(blockSize)) + .map { seq => (Matrices.fromVectors(seq.map(_._1)), seq.map(_._2).toArray) } + }.persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val sc = instances.sparkContext + var logLikelihood = Double.MinValue + var logLikelihoodPrev = 0.0 + + var iteration = 0 + while (iteration < $(maxIter) && math.abs(logLikelihood - logLikelihoodPrev) > $(tol)) { + val weightSumAccum = if (iteration == 0) sc.doubleAccumulator else null + val logLikelihoodAccum = sc.doubleAccumulator + val bcWeights = sc.broadcast(weights) + val bcGaussians = sc.broadcast(gaussians) + + // aggregate the cluster contribution for all sample points, + // and then compute the new distributions + blocks.mapPartitions { iter => + if (iter.nonEmpty) { + val agg = new BlockExpectationAggregator(numFeatures, + $(blockSize), bcWeights, bcGaussians) + while (iter.hasNext) { agg.add(iter.next) } + // sum of weights in this partition + val ws = agg.weights.sum + if (iteration == 0) weightSumAccum.add(ws) + logLikelihoodAccum.add(agg.logLikelihood) + agg.meanIter.zip(agg.covIter).zipWithIndex + .map { case ((mean, cov), i) => (i, (mean, cov, agg.weights(i), ws)) } + } else Iterator.empty + }.reduceByKey { case ((mean1, cov1, w1, ws1), (mean2, cov2, w2, ws2)) => + // update the weights, means and covariances for i-th distributions + BLAS.axpy(1.0, mean2, mean1) + BLAS.axpy(1.0, cov2, cov1) + (mean1, cov1, w1 + w2, ws1 + ws2) + }.mapValues { case (mean, cov, w, ws) => + // Create new distributions based on the partial assignments + // (often referred to as the "M" step in literature) + GaussianMixture.updateWeightsAndGaussians(mean, cov, w, ws) + }.collect().foreach { case (i, (weight, gaussian)) => + weights(i) = weight + gaussians(i) = gaussian + } + + bcWeights.destroy() + bcGaussians.destroy() + + if (iteration == 0) { + instr.logNumExamples(weightSumAccum.count) + instr.logSumOfWeights(weightSumAccum.value) + } + + logLikelihoodPrev = logLikelihood // current becomes previous + logLikelihood = logLikelihoodAccum.value // this is the freshly computed log-likelihood + instr.logNamedValue(s"logLikelihood@iter$iteration", logLikelihood) + iteration += 1 } + blocks.unpersist() - val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this) - val summary = new GaussianMixtureSummary(model.transform(dataset), - $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iteration) - instr.logNamedValue("logLikelihood", logLikelihood) - instr.logNamedValue("clusterSizes", summary.clusterSizes) - model.setSummary(Some(summary)) + (logLikelihood, iteration) } @Since("2.0.0") @@ -626,16 +729,15 @@ private class ExpectationAggregator( bcWeights: Broadcast[Array[Double]], bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable { - private val k: Int = bcWeights.value.length - private var totalCnt: Long = 0L - private var newLogLikelihood: Double = 0.0 - private lazy val newWeights: Array[Double] = Array.ofDim[Double](k) - private lazy val newMeans: Array[DenseVector] = Array.fill(k)( - new DenseVector(Array.ofDim[Double](numFeatures))) - private lazy val newCovs: Array[DenseVector] = Array.fill(k)( - new DenseVector(Array.ofDim[Double](numFeatures * (numFeatures + 1) / 2))) + private val k = bcWeights.value.length + private var totalCnt = 0L + private var newLogLikelihood = 0.0 + private val covSize = numFeatures * (numFeatures + 1) / 2 + private lazy val newWeights = Array.ofDim[Double](k) + @transient private lazy val newMeans = Array.fill(k)(Vectors.zeros(numFeatures).toDense) + @transient private lazy val newCovs = Array.fill(k)(Vectors.zeros(covSize).toDense) - @transient private lazy val oldGaussians = { + @transient private lazy val gaussians = { bcGaussians.value.map { case (mean, covVec) => val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) new MultivariateGaussian(mean, cov) @@ -662,7 +764,7 @@ private class ExpectationAggregator( def add(weightedVector: (Vector, Double)): this.type = { val (instance: Vector, weight: Double) = weightedVector val localWeights = bcWeights.value - val localOldGaussians = oldGaussians + val localOldGaussians = gaussians val prob = new Array[Double](k) var probSum = 0.0 @@ -690,34 +792,114 @@ private class ExpectationAggregator( totalCnt += 1 this } +} + + +/** + * BlockExpectationAggregator computes the partial expectation results. + * + * @param numFeatures The number of features. + * @param bcWeights The broadcast weights for each Gaussian distribution in the mixture. + * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution + * in the mixture. Note only upper triangular part of the covariance + * matrix of each distribution is stored as dense vector (column major) + * in order to reduce shuffled data size. + */ +private class BlockExpectationAggregator( + numFeatures: Int, + blockSize: Int, + bcWeights: Broadcast[Array[Double]], + bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable { + + private val k = bcWeights.value.length + private var totalCnt = 0L + private var newLogLikelihood = 0.0 + private val covSize = numFeatures * (numFeatures + 1) / 2 + private lazy val newWeights = Array.ofDim[Double](k) + @transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k) + @transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k) + @transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, k) + @transient private lazy val auxiliaryMat = DenseMatrix.zeros(blockSize, numFeatures) + @transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense + + @transient private lazy val gaussians = { + bcGaussians.value.map { case (mean, covVec) => + val cov = GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values) + new MultivariateGaussian(mean, cov) + } + } + + def count: Long = totalCnt + + def logLikelihood: Double = newLogLikelihood + + def weights: Array[Double] = newWeights + + def meanIter: Iterator[DenseVector] = newMeansMat.colIter.map(_.toDense) + + def covIter: Iterator[DenseVector] = newCovsMat.colIter.map(_.toDense) /** - * Merge another ExpectationAggregator, update the weights, means and covariances - * for each distributions, and update the log likelihood. - * (Note that it's in place merging; as a result, `this` object will be modified.) + * Add a new training instance block to this BlockExpectationAggregator, update the weights, + * means and covariances for each distributions, and update the log likelihood. * - * @param other The other ExpectationAggregator to be merged. - * @return This ExpectationAggregator object. + * @param block The instance block of data point to be added. + * @return This BlockExpectationAggregator object. */ - def merge(other: ExpectationAggregator): this.type = { - if (other.count != 0) { - totalCnt += other.totalCnt - - val localThisNewWeights = this.newWeights - val localOtherNewWeights = other.newWeights - val localThisNewMeans = this.newMeans - val localOtherNewMeans = other.newMeans - val localThisNewCovs = this.newCovs - val localOtherNewCovs = other.newCovs - var i = 0 - while (i < k) { - localThisNewWeights(i) += localOtherNewWeights(i) - BLAS.axpy(1.0, localOtherNewMeans(i), localThisNewMeans(i)) - BLAS.axpy(1.0, localOtherNewCovs(i), localThisNewCovs(i)) - i += 1 - } - newLogLikelihood += other.newLogLikelihood + def add(block: (Matrix, Array[Double])): this.type = { + val (matrix: Matrix, weights: Array[Double]) = block + require(matrix.isTransposed) + val size = matrix.numRows + require(weights.length == size) + + val probMat = if (blockSize == size) auxiliaryProbMat else DenseMatrix.zeros(size, k) + require(!probMat.isTransposed) + java.util.Arrays.fill(probMat.values, EPSILON) + + val mat = if (blockSize == size) auxiliaryMat else DenseMatrix.zeros(size, numFeatures) + var j = 0 + val blas1 = BLAS.getBLAS(size) + while (j < k) { + val pdfVec = gaussians(j).pdf(matrix, mat) + blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, + probMat.values, j * size, 1) + j += 1 } + + // compute the cov vector for each row vector + val covVec = auxiliaryCovVec + val covVecIter = matrix match { + case dm: DenseMatrix => + Iterator.tabulate(size) { i => + java.util.Arrays.fill(covVec.values, 0.0) + // when input block is dense, directly use nativeBLAS to avoid array copy + BLAS.nativeBLAS.dspr("U", numFeatures, 1.0, dm.values, i * numFeatures, 1, + covVec.values, 0) + covVec + } + + case sm: SparseMatrix => + sm.rowIter.map { vec => + java.util.Arrays.fill(covVec.values, 0.0) + BLAS.spr(1.0, vec, covVec) + covVec + } + } + + val blas2 = BLAS.getBLAS(k) + covVecIter.zip(weights.iterator).zipWithIndex.foreach { + case ((covVec, weight), i) => + val probSum = blas2.dasum(k, probMat.values, i, size) + blas2.dscal(k, weight / probSum, probMat.values, i, size) + blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1) + BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1, + probMat.values, i, size, newCovsMat.values, 0, covSize) + newLogLikelihood += math.log(probSum) * weight + } + + BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat) + totalCnt += size + this } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index b35f964c959b..d848d5a5ee45 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -285,6 +285,17 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest { testClusteringModelSingleProbabilisticPrediction(model, model.predictProbability, dataset, model.getFeaturesCol, model.getProbabilityCol) } + + test("GMM on blocks") { + Seq(dataset, sparseDataset, denseDataset, rDataset).foreach { dataset => + val gmm = new GaussianMixture().setK(k).setMaxIter(20).setBlockSize(1).setSeed(seed) + val model = gmm.fit(dataset) + Seq(2, 4, 8, 16, 32).foreach { blockSize => + val model2 = gmm.setBlockSize(blockSize).fit(dataset) + modelEquals(model, model2) + } + } + } } object GaussianMixtureSuite extends SparkFunSuite { diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 984ca411167d..8f0e9aad01b5 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -98,7 +98,8 @@ def numIter(self): @inherit_doc class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol, - HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol): + HasProbabilityCol, HasTol, HasAggregationDepth, HasWeightCol, + HasBlockSize): """ Params for :py:class:`GaussianMixture` and :py:class:`GaussianMixtureModel`. @@ -243,6 +244,8 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav >>> gm.getMaxIter() 30 >>> model = gm.fit(df) + >>> model.getBlockSize() + 1 >>> model.getAggregationDepth() 2 >>> model.getFeaturesCol() @@ -327,16 +330,16 @@ class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, Jav @keyword_only def __init__(self, featuresCol="features", predictionCol="prediction", k=2, probabilityCol="probability", tol=0.01, maxIter=100, seed=None, - aggregationDepth=2, weightCol=None): + aggregationDepth=2, weightCol=None, blockSize=1): """ __init__(self, featuresCol="features", predictionCol="prediction", k=2, \ probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \ - aggregationDepth=2, weightCol=None) + aggregationDepth=2, weightCol=None, blockSize=1) """ super(GaussianMixture, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.GaussianMixture", self.uid) - self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2) + self._setDefault(k=2, tol=0.01, maxIter=100, aggregationDepth=2, blockSize=1) kwargs = self._input_kwargs self.setParams(**kwargs) @@ -347,11 +350,11 @@ def _create_model(self, java_model): @since("2.0.0") def setParams(self, featuresCol="features", predictionCol="prediction", k=2, probabilityCol="probability", tol=0.01, maxIter=100, seed=None, - aggregationDepth=2, weightCol=None): + aggregationDepth=2, weightCol=None, blockSize=1): """ setParams(self, featuresCol="features", predictionCol="prediction", k=2, \ probabilityCol="probability", tol=0.01, maxIter=100, seed=None, \ - aggregationDepth=2, weightCol=None) + aggregationDepth=2, weightCol=None, blockSize=1) Sets params for GaussianMixture. """ @@ -421,6 +424,13 @@ def setAggregationDepth(self, value): """ return self._set(aggregationDepth=value) + @since("3.1.0") + def setBlockSize(self, value): + """ + Sets the value of :py:attr:`blockSize`. + """ + return self._set(blockSize=value) + class GaussianMixtureSummary(ClusteringSummary): """ From 0eb0f07b0ca8f48287cb9f5e9f3fee6727eebf05 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 7 May 2020 11:00:02 +0800 Subject: [PATCH 2/4] nit --- .../spark/ml/stat/distribution/MultivariateGaussian.scala | 2 +- .../org/apache/spark/ml/clustering/GaussianMixture.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala index e8edc9df1495..a08b8af0fcbf 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala @@ -96,7 +96,7 @@ class MultivariateGaussian @Since("2.0.0") ( val pdfVec = mat.multiply(rootSigmaInvMulMu) val blas = BLAS.getBLAS(n) - val squared1 = BLAS.dot(rootSigmaInvMulMu, rootSigmaInvMulMu) + val squared1 = blas.ddot(n, rootSigmaInvMulMu.values, 1, rootSigmaInvMulMu.values, 1) val localU = u var i = 0 diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index b5f1878f8143..aaf1ea793857 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -764,13 +764,13 @@ private class ExpectationAggregator( def add(weightedVector: (Vector, Double)): this.type = { val (instance: Vector, weight: Double) = weightedVector val localWeights = bcWeights.value - val localOldGaussians = gaussians + val localGaussians = gaussians val prob = new Array[Double](k) var probSum = 0.0 var i = 0 while (i < k) { - val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance) + val p = EPSILON + localWeights(i) * localGaussians(i).pdf(instance) prob(i) = p probSum += p i += 1 From 31f8907a3266f976d87d630056c019bd478652d1 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Thu, 7 May 2020 17:17:56 +0800 Subject: [PATCH 3/4] nit --- .../spark/ml/clustering/GaussianMixture.scala | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index aaf1ea793857..7dd29c0b8958 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -819,7 +819,7 @@ private class BlockExpectationAggregator( @transient private lazy val newMeansMat = DenseMatrix.zeros(numFeatures, k) @transient private lazy val newCovsMat = DenseMatrix.zeros(covSize, k) @transient private lazy val auxiliaryProbMat = DenseMatrix.zeros(blockSize, k) - @transient private lazy val auxiliaryMat = DenseMatrix.zeros(blockSize, numFeatures) + @transient private lazy val auxiliaryPDFMat = DenseMatrix.zeros(blockSize, numFeatures) @transient private lazy val auxiliaryCovVec = Vectors.zeros(covSize).toDense @transient private lazy val gaussians = { @@ -852,20 +852,36 @@ private class BlockExpectationAggregator( val size = matrix.numRows require(weights.length == size) + val blas1 = BLAS.getBLAS(size) + val blas2 = BLAS.getBLAS(k) + val probMat = if (blockSize == size) auxiliaryProbMat else DenseMatrix.zeros(size, k) require(!probMat.isTransposed) java.util.Arrays.fill(probMat.values, EPSILON) - val mat = if (blockSize == size) auxiliaryMat else DenseMatrix.zeros(size, numFeatures) + val pdfMat = if (blockSize == size) auxiliaryPDFMat else DenseMatrix.zeros(size, numFeatures) + val probSumVec = Vectors.zeros(size).toDense var j = 0 - val blas1 = BLAS.getBLAS(size) while (j < k) { - val pdfVec = gaussians(j).pdf(matrix, mat) - blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, - probMat.values, j * size, 1) + val pdfVec = gaussians(j).pdf(matrix, pdfMat) + val w = bcWeights.value(j) + blas1.daxpy(size, w, pdfVec.values, 1, probSumVec.values, 1) + blas1.daxpy(size, w, pdfVec.values, 0, 1, probMat.values, j * size, 1) j += 1 } + var i = 0 + while (i < size) { + val probSum = probSumVec(i) + val weight = weights(i) + blas2.dscal(k, weight / probSum, probMat.values, i, size) + blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1) + newLogLikelihood += math.log(probSum) * weight + i += 1 + } + + BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat) + // compute the cov vector for each row vector val covVec = auxiliaryCovVec val covVecIter = matrix match { @@ -886,18 +902,11 @@ private class BlockExpectationAggregator( } } - val blas2 = BLAS.getBLAS(k) - covVecIter.zip(weights.iterator).zipWithIndex.foreach { - case ((covVec, weight), i) => - val probSum = blas2.dasum(k, probMat.values, i, size) - blas2.dscal(k, weight / probSum, probMat.values, i, size) - blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1) - BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1, - probMat.values, i, size, newCovsMat.values, 0, covSize) - newLogLikelihood += math.log(probSum) * weight + covVecIter.zipWithIndex.foreach { case (covVec, i) => + BLAS.nativeBLAS.dger(covSize, k, 1.0, covVec.values, 0, 1, + probMat.values, i, size, newCovsMat.values, 0, covSize) } - BLAS.gemm(1.0, matrix.transpose, probMat, 1.0, newMeansMat) totalCnt += size this From a3a005e588b52009a674dc5b9ac237b97017cd25 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Sat, 9 May 2020 19:24:54 +0800 Subject: [PATCH 4/4] fix probSum --- .../spark/ml/clustering/GaussianMixture.scala | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index 7dd29c0b8958..6d4137b638dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -758,11 +758,11 @@ private class ExpectationAggregator( * Add a new training instance to this ExpectationAggregator, update the weights, * means and covariances for each distributions, and update the log likelihood. * - * @param weightedVector The instance of data point to be added. + * @param instance The instance of data point to be added. * @return This ExpectationAggregator object. */ - def add(weightedVector: (Vector, Double)): this.type = { - val (instance: Vector, weight: Double) = weightedVector + def add(instance: (Vector, Double)): this.type = { + val (vector: Vector, weight: Double) = instance val localWeights = bcWeights.value val localGaussians = gaussians @@ -770,7 +770,7 @@ private class ExpectationAggregator( var probSum = 0.0 var i = 0 while (i < k) { - val p = EPSILON + localWeights(i) * localGaussians(i).pdf(instance) + val p = EPSILON + localWeights(i) * localGaussians(i).pdf(vector) prob(i) = p probSum += p i += 1 @@ -784,8 +784,8 @@ private class ExpectationAggregator( while (i < k) { val w = prob(i) / probSum * weight localNewWeights(i) += w - BLAS.axpy(w, instance, localNewMeans(i)) - BLAS.spr(w, instance, localNewCovs(i)) + BLAS.axpy(w, vector, localNewMeans(i)) + BLAS.spr(w, vector, localNewCovs(i)) i += 1 } @@ -860,20 +860,17 @@ private class BlockExpectationAggregator( java.util.Arrays.fill(probMat.values, EPSILON) val pdfMat = if (blockSize == size) auxiliaryPDFMat else DenseMatrix.zeros(size, numFeatures) - val probSumVec = Vectors.zeros(size).toDense var j = 0 while (j < k) { val pdfVec = gaussians(j).pdf(matrix, pdfMat) - val w = bcWeights.value(j) - blas1.daxpy(size, w, pdfVec.values, 1, probSumVec.values, 1) - blas1.daxpy(size, w, pdfVec.values, 0, 1, probMat.values, j * size, 1) + blas1.daxpy(size, bcWeights.value(j), pdfVec.values, 0, 1, probMat.values, j * size, 1) j += 1 } var i = 0 while (i < size) { - val probSum = probSumVec(i) val weight = weights(i) + val probSum = blas2.dasum(k, probMat.values, i, size) blas2.dscal(k, weight / probSum, probMat.values, i, size) blas2.daxpy(k, 1.0, probMat.values, i, size, newWeights, 0, 1) newLogLikelihood += math.log(probSum) * weight