From 92e22c8f178192dd6cb5f365595f6ffc9dc692b8 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Thu, 16 Apr 2020 11:10:05 +0800 Subject: [PATCH 01/10] Add KMeansMatrixImpl --- .../spark/ml/clustering/DistanceMeasure.scala | 353 +++++++++++ .../apache/spark/ml/clustering/KMeans.scala | 62 +- .../ml/clustering/KMeansMatrixImpl.scala | 564 ++++++++++++++++++ .../spark/ml/clustering/LocalKMeans.scala | 133 +++++ .../apache/spark/ml/util/DatasetUtils.scala | 33 +- 5 files changed, 1142 insertions(+), 3 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala new file mode 100644 index 0000000000000..01fa0c2bd2759 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.linalg.{DenseVector, Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS.{axpy, dot, scal} + +private[spark] abstract class DistanceMeasure extends Serializable { + + /** + * @return the index of the closest center to the given point, as well as the cost. + */ + def findClosest( + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + centers.foreach { center => + val currentDistance = distance(center, point) + if (currentDistance < bestDistance) { + bestDistance = currentDistance + bestIndex = i + } + i += 1 + } + (bestIndex, bestDistance) + } + + /** + * @return the K-means cost of a given point against the given cluster centers. + */ + def pointCost( + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): Double = { + findClosest(centers, point)._2 + } + + /** + * @return whether a center converged or not, given the epsilon parameter. + */ + def isCenterConverged( + oldCenter: VectorWithNorm, + newCenter: VectorWithNorm, + epsilon: Double): Boolean = { + distance(oldCenter, newCenter) <= epsilon + } + + /** + * @return the distance between two points. + */ + def distance( + v1: VectorWithNorm, + v2: VectorWithNorm): Double + + /** + * @return the total cost of the cluster from its aggregated properties + */ + def clusterCost( + centroid: VectorWithNorm, + pointsSum: VectorWithNorm, + numberOfPoints: Long, + pointsSquaredNorm: Double): Double + + /** + * Updates the value of `sum` adding the `point` vector. + * @param point a `VectorWithNorm` to be added to `sum` of a cluster + * @param sum the `sum` for a cluster to be updated + */ + def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { + axpy(1.0, point.vector, sum) + } + + /** + * Returns a centroid for a cluster given its `sum` vector and its `count` of points. + * + * @param sum the `sum` for a cluster + * @param count the number of points in the cluster + * @return the centroid of the cluster + */ + def centroid(sum: Vector, count: Long): VectorWithNorm = { + scal(1.0 / count, sum) + new VectorWithNorm(sum) + } + + /** + * Returns two new centroids symmetric to the specified centroid applying `noise` with the + * with the specified `level`. + * + * @param level the level of `noise` to apply to the given centroid. + * @param noise a noise vector + * @param centroid the parent centroid + * @return a left and right centroid symmetric to `centroid` + */ + def symmetricCentroids( + level: Double, + noise: Vector, + centroid: Vector): (VectorWithNorm, VectorWithNorm) = { + val left = centroid.copy + axpy(-level, noise, left) + val right = centroid.copy + axpy(level, noise, right) + (new VectorWithNorm(left), new VectorWithNorm(right)) + } + + /** + * @return the cost of a point to be assigned to the cluster centroid + */ + def cost( + point: VectorWithNorm, + centroid: VectorWithNorm): Double = distance(point, centroid) +} + +@Since("2.4.0") +object DistanceMeasure { + + @Since("2.4.0") + val EUCLIDEAN = "euclidean" + @Since("2.4.0") + val COSINE = "cosine" + + private[spark] def decodeFromString(distanceMeasure: String): DistanceMeasure = + distanceMeasure match { + case EUCLIDEAN => new EuclideanDistanceMeasure + case COSINE => new CosineDistanceMeasure + case _ => throw new IllegalArgumentException(s"distanceMeasure must be one of: " + + s"$EUCLIDEAN, $COSINE. $distanceMeasure provided.") + } + + private[spark] def validateDistanceMeasure(distanceMeasure: String): Boolean = { + distanceMeasure match { + case DistanceMeasure.EUCLIDEAN => true + case DistanceMeasure.COSINE => true + case _ => false + } + } +} + +private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { + /** + * @return the index of the closest center to the given point, as well as the squared distance. + */ + override def findClosest( + centers: TraversableOnce[VectorWithNorm], + point: VectorWithNorm): (Int, Double) = { + var bestDistance = Double.PositiveInfinity + var bestIndex = 0 + var i = 0 + centers.foreach { center => + // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary + // distance computation. + var lowerBoundOfSqDist = center.norm - point.norm + lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist + if (lowerBoundOfSqDist < bestDistance) { + val distance: Double = EuclideanDistanceMeasure.fastSquaredDistance(center, point) + if (distance < bestDistance) { + bestDistance = distance + bestIndex = i + } + } + i += 1 + } + (bestIndex, bestDistance) + } + + /** + * @return whether a center converged or not, given the epsilon parameter. + */ + override def isCenterConverged( + oldCenter: VectorWithNorm, + newCenter: VectorWithNorm, + epsilon: Double): Boolean = { + EuclideanDistanceMeasure.fastSquaredDistance(newCenter, oldCenter) <= epsilon * epsilon + } + + /** + * @param v1: first vector + * @param v2: second vector + * @return the Euclidean distance between the two input vectors + */ + override def distance(v1: VectorWithNorm, v2: VectorWithNorm): Double = { + Math.sqrt(EuclideanDistanceMeasure.fastSquaredDistance(v1, v2)) + } + + /** + * @return the total cost of the cluster from its aggregated properties + */ + override def clusterCost( + centroid: VectorWithNorm, + pointsSum: VectorWithNorm, + numberOfPoints: Long, + pointsSquaredNorm: Double): Double = { + math.max(pointsSquaredNorm - numberOfPoints * centroid.norm * centroid.norm, 0.0) + } + + /** + * @return the cost of a point to be assigned to the cluster centroid + */ + override def cost( + point: VectorWithNorm, + centroid: VectorWithNorm): Double = { + EuclideanDistanceMeasure.fastSquaredDistance(point, centroid) + } +} + + +private[spark] object EuclideanDistanceMeasure { + private[ml] lazy val EPSILON = { + var eps = 1.0 + while ((1.0 + (eps / 2.0)) != 1.0) { + eps /= 2.0 + } + eps + } + + private[ml] def fastSquaredDistance( + v1: Vector, + norm1: Double, + v2: Vector, + norm2: Double, + precision: Double = 1e-6): Double = { + val n = v1.size + require(v2.size == n) + require(norm1 >= 0.0 && norm2 >= 0.0) + var sqDist = 0.0 + /* + * The relative error is + *
+     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
+     * 
+ * which is bounded by + *
+     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
+     * 
+ * The bound doesn't need the inner product, so we can use it as a sufficient condition to + * check quickly whether the inner product approach is accurate. + */ + if (v1.isInstanceOf[DenseVector] && v2.isInstanceOf[DenseVector]) { + sqDist = Vectors.sqdist(v1, v2) + } else { + val sumSquaredNorm = norm1 * norm1 + norm2 * norm2 + val normDiff = norm1 - norm2 + val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON) + if (precisionBound1 < precision) { + sqDist = sumSquaredNorm - 2.0 * dot(v1, v2) + } else { + val dotValue = dot(v1, v2) + sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0) + val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) / + (sqDist + EPSILON) + if (precisionBound2 > precision) { + sqDist = Vectors.sqdist(v1, v2) + } + } + } + sqDist + } + + /** + * @return the squared Euclidean distance between two vectors computed by + * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. + */ + private[clustering] def fastSquaredDistance( + v1: VectorWithNorm, + v2: VectorWithNorm): Double = { + fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) + } +} + +private[spark] class CosineDistanceMeasure extends DistanceMeasure { + /** + * @param v1: first vector + * @param v2: second vector + * @return the cosine distance between the two input vectors + */ + override def distance(v1: VectorWithNorm, v2: VectorWithNorm): Double = { + assert(v1.norm > 0 && v2.norm > 0, "Cosine distance is not defined for zero-length vectors.") + 1 - dot(v1.vector, v2.vector) / v1.norm / v2.norm + } + + /** + * Updates the value of `sum` adding the `point` vector. + * @param point a `VectorWithNorm` to be added to `sum` of a cluster + * @param sum the `sum` for a cluster to be updated + */ + override def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { + assert(point.norm > 0, "Cosine distance is not defined for zero-length vectors.") + axpy(1.0 / point.norm, point.vector, sum) + } + + /** + * Returns a centroid for a cluster given its `sum` vector and its `count` of points. + * + * @param sum the `sum` for a cluster + * @param count the number of points in the cluster + * @return the centroid of the cluster + */ + override def centroid(sum: Vector, count: Long): VectorWithNorm = { + scal(1.0 / count, sum) + val norm = Vectors.norm(sum, 2) + scal(1.0 / norm, sum) + new VectorWithNorm(sum, 1) + } + + /** + * @return the total cost of the cluster from its aggregated properties + */ + override def clusterCost( + centroid: VectorWithNorm, + pointsSum: VectorWithNorm, + numberOfPoints: Long, + pointsSquaredNorm: Double): Double = { + val costVector = pointsSum.vector.copy + math.max(numberOfPoints - dot(centroid.vector, costVector) / centroid.norm, 0.0) + } + + /** + * Returns two new centroids symmetric to the specified centroid applying `noise` with the + * with the specified `level`. + * + * @param level the level of `noise` to apply to the given centroid. + * @param noise a noise vector + * @param centroid the parent centroid + * @return a left and right centroid symmetric to `centroid` + */ + override def symmetricCentroids( + level: Double, + noise: Vector, + centroid: Vector): (VectorWithNorm, VectorWithNorm) = { + val (left, right) = super.symmetricCentroids(level, noise, centroid) + val leftVector = left.vector + val rightVector = right.vector + scal(1.0 / left.norm, leftVector) + scal(1.0 / right.norm, rightVector) + (new VectorWithNorm(leftVector, 1.0), new VectorWithNorm(rightVector, 1.0)) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index a42c920e24987..321e9dfd758ed 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.spark.SparkConf import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model, PipelineStage} import org.apache.spark.ml.linalg.Vector @@ -28,7 +29,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} +import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD @@ -331,7 +332,64 @@ class KMeans @Since("1.5.0") ( def setWeightCol(value: String): this.type = set(weightCol, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr => + override def fit(dataset: Dataset[_]): KMeansModel = { + val conf = new SparkConf(true) + val enableMatrixImpl = conf.getBoolean("spark.ml.kmeans.matrixImplementation.enabled", false) + + if (enableMatrixImpl && $(distanceMeasure) == "euclidean") { + val matrixRowNum = conf.getInt("spark.ml.kmeans.matrixImplementation.rowsPerMatrix", 1000) + fitMatrixImpl(dataset, matrixRowNum) + } else { + // Calling the old K-Means implementation + fitOld(dataset) + } + } + + private def fitMatrixImpl(dataset: Dataset[_], + matrix_row_num: Int): KMeansModel = instrumented { instr => + + instr.logInfo(s"k-means matrix implementation enabled! rowsPerMatrix = ${matrix_row_num}") + + transformSchema(dataset.schema, logging = true) + + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + val instances = DatasetUtils.columnToDenseMatrix(dataset, getFeaturesCol, matrix_row_num) + + if (handlePersistence) { + instances.persist(StorageLevel.MEMORY_AND_DISK) + } + + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, + maxIter, seed, tol) + val algo = new KMeansMatrixImpl() + .setK($(k)) + .setInitializationMode($(initMode)) + .setInitializationSteps($(initSteps)) + .setMaxIterations($(maxIter)) + .setSeed($(seed)) + .setEpsilon($(tol)) + .setDistanceMeasure($(distanceMeasure)) + val parentModel = algo.run(instances, matrix_row_num, Option(instr)) + val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + val summary = new KMeansSummary( + model.transform(dataset), + $(predictionCol), + $(featuresCol), + $(k), + parentModel.numIter, + parentModel.trainingCost) + + model.setSummary(Some(summary)) + instr.logNamedValue("clusterSizes", summary.clusterSizes) + if (handlePersistence) { + instances.unpersist() + } + model + } + + private def fitOld(dataset: Dataset[_]): KMeansModel = instrumented { instr => transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala new file mode 100644 index 0000000000000..f92b7d49f9299 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.annotation.Since +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{BLAS, DenseMatrix, Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS.axpy +import org.apache.spark.ml.util.Instrumentation +import org.apache.spark.mllib.clustering.{KMeansModel => MLlibKMeansModel} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils +import org.apache.spark.util.random.XORShiftRandom + + +/** + * K-means clustering using RDD[DenseMatrix] as input format. + */ +@Since("0.8.0") +class KMeansMatrixImpl private( + private var k: Int, + private var maxIterations: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double, + private var seed: Long, + private var distanceMeasure: String) extends Serializable with Logging { + + @Since("0.8.0") + private def this(k: Int, maxIterations: Int, initializationMode: String, initializationSteps: Int, + epsilon: Double, seed: Long) = + this(k, maxIterations, initializationMode, initializationSteps, + epsilon, seed, DistanceMeasure.EUCLIDEAN) + + /** + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, + * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random, + * distanceMeasure: "euclidean"}. + */ + @Since("0.8.0") + def this() = this(2, 20, KMeansMatrixImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), + DistanceMeasure.EUCLIDEAN) + + /** + * Number of clusters to create (k). + * + * @note It is possible for fewer than k clusters to + * be returned, for example, if there are fewer than k distinct points to cluster. + */ + @Since("1.4.0") + def getK: Int = k + + /** + * Set the number of clusters to create (k). + * + * @note It is possible for fewer than k clusters to + * be returned, for example, if there are fewer than k distinct points to cluster. + * Default: 2. + */ + @Since("0.8.0") + def setK(k: Int): this.type = { + require(k > 0, + s"Number of clusters must be positive but got ${k}") + this.k = k + this + } + + /** + * Maximum number of iterations allowed. + */ + @Since("1.4.0") + def getMaxIterations: Int = maxIterations + + /** + * Set maximum number of iterations allowed. Default: 20. + */ + @Since("0.8.0") + def setMaxIterations(maxIterations: Int): this.type = { + require(maxIterations >= 0, + s"Maximum of iterations must be nonnegative but got ${maxIterations}") + this.maxIterations = maxIterations + this + } + + /** + * The initialization algorithm. This can be either "random" or "k-means||". + */ + @Since("1.4.0") + def getInitializationMode: String = initializationMode + + /** + * Set the initialization algorithm. This can be either "random" to choose random points as + * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ + * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. + */ + @Since("0.8.0") + def setInitializationMode(initializationMode: String): this.type = { + KMeansMatrixImpl.validateInitMode(initializationMode) + this.initializationMode = initializationMode + this + } + + /** + * Number of steps for the k-means|| initialization mode + */ + @Since("1.4.0") + def getInitializationSteps: Int = initializationSteps + + /** + * Set the number of steps for the k-means|| initialization mode. This is an advanced + * setting -- the default of 2 is almost always enough. Default: 2. + */ + @Since("0.8.0") + def setInitializationSteps(initializationSteps: Int): this.type = { + require(initializationSteps > 0, + s"Number of initialization steps must be positive but got ${initializationSteps}") + this.initializationSteps = initializationSteps + this + } + + /** + * The distance threshold within which we've consider centers to have converged. + */ + @Since("1.4.0") + def getEpsilon: Double = epsilon + + /** + * Set the distance threshold within which we've consider centers to have converged. + * If all centers move less than this Euclidean distance, we stop iterating one run. + */ + @Since("0.8.0") + def setEpsilon(epsilon: Double): this.type = { + require(epsilon >= 0, + s"Distance threshold must be nonnegative but got ${epsilon}") + this.epsilon = epsilon + this + } + + /** + * The random seed for cluster initialization. + */ + @Since("1.4.0") + def getSeed: Long = seed + + /** + * Set the random seed for cluster initialization. + */ + @Since("1.4.0") + def setSeed(seed: Long): this.type = { + this.seed = seed + this + } + + /** + * The distance suite used by the algorithm. + */ + @Since("2.4.0") + def getDistanceMeasure: String = distanceMeasure + + /** + * Set the distance suite used by the algorithm. + */ + @Since("2.4.0") + def setDistanceMeasure(distanceMeasure: String): this.type = { + DistanceMeasure.validateDistanceMeasure(distanceMeasure) + this.distanceMeasure = distanceMeasure + this + } + + // Initial cluster centers can be provided as a KMeansModel object rather than using the + // random or k-means|| initializationMode + private var initialModel: Option[MLlibKMeansModel] = None + + /** + * Set the initial starting point, bypassing the random initialization or k-means|| + * The condition model.k == this.k must be met, failure results + * in an IllegalArgumentException. + */ + @Since("1.4.0") + def setInitialModel(model: MLlibKMeansModel): this.type = { + require(model.k == k, "mismatched cluster count") + initialModel = Some(model) + this + } + + private[spark] def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { + + val v: Array[Vector] = vectors.map(_.vector) + + VectorsToDenseMatrix(v.toIterator) + } + + private[spark] def VectorsToDenseMatrix(vectors: Iterator[Vector]): DenseMatrix = { + val vector_array = vectors.toArray + val column_num = vector_array(0).size + val row_num = vector_array.length + + val values = new Array[Double](row_num * column_num) + var rowIndex = 0 + + // convert to column-major dense matrix + for (vector <- vector_array) { + for ((value, index) <- vector.toArray.zipWithIndex) { + values(index * row_num + rowIndex) = value + } + rowIndex = rowIndex + 1 + } + + new DenseMatrix(row_num, column_num, values) + } + + private[spark] def run(data: RDD[DenseMatrix], row_num: Int, + instr: Option[Instrumentation]): MLlibKMeansModel = { + + // Cache RDD data + data.cache.count() + + val model = runAlgorithm(data, instr) + + // Warn at the end of the run as well, for increased visibility. + if (data.getStorageLevel == StorageLevel.NONE) { + logWarning("The input data was not directly cached, which may hurt performance if its" + + " parent RDDs are also uncached.") + } + model + + } + + private def computeSquaredDistances(points_matrix: DenseMatrix, + points_square_sums: DenseMatrix, + centers_matrix: DenseMatrix, + centers_square_sums: DenseMatrix): DenseMatrix = { + // (x + y)^2 = x^2 + y^2 + 2 * x * y + + // Add up squared sums of points and centers (x^2 + y^2) + val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) + + // use GEMM to compute squared distances, (2*x*y) can be decomposed to matrix multiply + val alpha = -2.0 + val beta = 1.0 + BLAS.gemm(alpha, points_matrix, centers_matrix.transpose, beta, ret) + + ret + } + + private def computePointsSquareSum(points_matrix: DenseMatrix, + centers_num: Int): DenseMatrix = { + val points_num = points_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- points_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until centers_num) + ret(index, i) = square + } + ret + } + + private def computeCentersSquareSum(centers_matrix: DenseMatrix, + points_num: Int): DenseMatrix = { + val centers_num = centers_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- centers_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until points_num) + ret(i, index) = square + } + ret + } + + // use GEMM to compute matrix sum + private def computeMatrixSum(matrix1: DenseMatrix, + matrix2: DenseMatrix): DenseMatrix = { + val column_num = matrix1.numCols + val eye = DenseMatrix.eye(column_num) + val alpha = 1.0 + val beta = 1.0 + BLAS.gemm(alpha, matrix1, eye, beta, matrix2) + matrix2 + } + + private def findClosest(distances: DenseMatrix): (Array[Int], Array[Double]) = { + val points_num = distances.numRows + val ret_closest = new Array[Int](points_num) + val ret_cost = new Array[Double](points_num) + + for ((row, index) <- distances.rowIter.zipWithIndex) { + var closest = 0 + var cost = row(0) + for (i <- 1 until row.size) { + if (row(i) < cost) { + closest = i + cost = row(i) + } + } + ret_closest(index) = closest + // mllib use squared distance as cost, not sqrt + ret_cost(index) = cost + } + + (ret_closest, ret_cost) + + } + + private def runAlgorithm(data: RDD[DenseMatrix], + instr: Option[Instrumentation]): MLlibKMeansModel = { + + val sc = data.sparkContext + + val initStartTime = System.nanoTime() + + val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure) + + val centers = if (initializationMode == KMeansMatrixImpl.RANDOM) { + initRandom(data) + } else { + initKMeansParallel(data, distanceMeasureInstance) + } + + val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 + logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") + + var converged = false + var cost = 0.0 + var iteration = 0 + + val iterationStartTime = System.nanoTime() + + instr.foreach(_.logNumFeatures(centers.head.vector.size)) + + // Execute iterations of Lloyd's algorithm until converged + while (iteration < maxIterations && !converged) { + // Convert center vectors to dense matrix + val centers_matrix = VectorsToDenseMatrix(centers) + + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers_matrix) + + val centers_num = centers_matrix.numRows + val centers_dim = centers_matrix.numCols + + // Compute squared sums for points + val data_square_sums: RDD[DenseMatrix] = data.mapPartitions { points_matrices => + points_matrices.map { points_matrix => computePointsSquareSum(points_matrix, centers_num) } + } + + // Find the new centers + val collected = data.zip(data_square_sums).flatMap { + case (points_matrix, points_square_sums) => + val centers_matrix = bcCenters.value + val points_num = points_matrix.numRows + + val sums = Array.fill(centers_num)(Vectors.zeros(centers_dim)) + val counts = Array.fill(centers_num)(0L) + + // Compute squared sums for centers + val centers_square_sums = computeCentersSquareSum(centers_matrix, points_num) + + // Compute squared distances + val distances = computeSquaredDistances( + points_matrix, points_square_sums, + centers_matrix, centers_square_sums) + + val (bestCenters, costs) = findClosest(distances) + + for (cost <- costs) + costAccum.add(cost) + + // sums points around best center + for ((row, index) <- points_matrix.rowIter.zipWithIndex) { + val bestCenter = bestCenters(index) + axpy(1.0, row, sums(bestCenter)) + counts(bestCenter) += 1 + } + + counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => + axpy(1.0, sum2, sum1) + (sum1, count1 + count2) + }.collectAsMap() + + if (iteration == 0) { + instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) + } + + val newCenters = collected.mapValues { case (sum, count) => + distanceMeasureInstance.centroid(sum, count) + } + + bcCenters.destroy() + + // Update the cluster centers and costs + converged = true + newCenters.foreach { case (j, newCenter) => + if (converged && + !distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) { + converged = false + } + centers(j) = newCenter + } + + cost = costAccum.value + iteration += 1 + } + + val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 + logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") + + if (iteration == maxIterations) { + logInfo(s"KMeans reached the max number of iterations: $maxIterations.") + } else { + logInfo(s"KMeans converged in $iteration iterations.") + } + + logInfo(s"The cost is $cost.") + + new MLlibKMeansModel(centers.map { c => OldVectors.fromML(c.vector) }, + distanceMeasure, cost, iteration) + } + + /** + * Initialize a set of cluster centers at random. + */ + private def initRandom(data: RDD[DenseMatrix]): Array[VectorWithNorm] = { + val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.rowIter).map(new VectorWithNorm(_)) + + // Select without replacement; may still produce duplicates if the data has < k distinct + // points, so deduplicate the centroids to match the behavior of k-means|| in the same situation + + vectorWithNorms.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()) + .map(_.vector).distinct.map(new VectorWithNorm(_)) + } + + /** + * Initialize a set of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + */ + private[clustering] def initKMeansParallel(rdd_matrix: RDD[DenseMatrix], + distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = { + + val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.rowIter).map(new VectorWithNorm(_)) + + // Initialize empty centers and point costs. + var costs = data.map(_ => Double.PositiveInfinity) + + // Initialize the first center to a random point. + val seed = new XORShiftRandom(this.seed).nextInt() + val sample = data.takeSample(false, 1, seed) + // Could be empty if data is empty; fail with a better message early: + require(sample.nonEmpty, s"No samples available from $data") + + val centers = ArrayBuffer[VectorWithNorm]() + var newCenters = Seq(sample.head.toDense) + centers ++= newCenters + + // On each step, sample 2 * k points on average with probability proportional + // to their squared distance from the centers. Note that only distances between points + // and new centers are computed in each iteration. + var step = 0 + val bcNewCentersList = ArrayBuffer[Broadcast[_]]() + while (step < initializationSteps) { + val bcNewCenters = data.context.broadcast(newCenters) + bcNewCentersList += bcNewCenters + val preCosts = costs + costs = data.zip(preCosts).map { case (point, cost) => + math.min(distanceMeasureInstance.pointCost(bcNewCenters.value, point), cost) + }.persist(StorageLevel.MEMORY_AND_DISK) + val sumCosts = costs.sum() + + bcNewCenters.unpersist() + preCosts.unpersist() + + val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) => + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) + pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1) + }.collect() + newCenters = chosen.map(_.toDense) + centers ++= newCenters + step += 1 + } + + costs.unpersist() + bcNewCentersList.foreach(_.destroy()) + + val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)) + + if (distinctCenters.size <= k) { + distinctCenters.toArray + } else { + // Finally, we might have a set of more than k distinct candidate centers; weight each + // candidate by the number of points in the dataset mapping to it and run a local k-means++ + // on the weighted centers to pick k of them + val bcCenters = data.context.broadcast(distinctCenters) + val countMap = data + .map(distanceMeasureInstance.findClosest(bcCenters.value, _)._1) + .countByValue() + + bcCenters.destroy() + + val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray + LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30) + } + } +} + + +/** + * Top-level methods for calling K-means clustering. + */ +@Since("0.8.0") +object KMeansMatrixImpl { + + // Initialization mode names + @Since("0.8.0") + val RANDOM = "random" + @Since("0.8.0") + val K_MEANS_PARALLEL = "k-means||" + + private[spark] def validateInitMode(initMode: String): Boolean = { + initMode match { + case KMeansMatrixImpl.RANDOM => true + case KMeansMatrixImpl.K_MEANS_PARALLEL => true + case _ => false + } + } +} + +/** + * A vector with its norm for fast distance computation. + */ +private[ml] class VectorWithNorm(val vector: Vector, val norm: Double) + extends Serializable { + + def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) + + def this(array: Array[Double]) = this(Vectors.dense(array)) + + /** Converts the vector to a dense vector. */ + def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala new file mode 100644 index 0000000000000..35183e4df4c8e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import scala.util.Random + +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.BLAS.{axpy, scal} +import org.apache.spark.ml.linalg.Vectors + +/** + * An utility object to run K-means locally. This is private to the ML package because it's used + * in the initialization of KMeans but not meant to be publicly exposed. + */ +private[ml] object LocalKMeans extends Logging { + + /** + * Run K-means++ on the weighted point set `points`. This first does the K-means++ + * initialization procedure and then rounds of Lloyd's algorithm. + */ + def kMeansPlusPlus( + seed: Int, + points: Array[VectorWithNorm], + weights: Array[Double], + k: Int, + maxIterations: Int + ): Array[VectorWithNorm] = { + val rand = new Random(seed) + val dimensions = points(0).vector.size + val centers = new Array[VectorWithNorm](k) + + // Initialize centers by sampling using the k-means++ procedure. + centers(0) = pickWeighted(rand, points, weights).toDense + val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0))) + + for (i <- 1 until k) { + val sum = costArray.zip(weights).map(p => p._1 * p._2).sum + val r = rand.nextDouble() * sum + var cumulativeScore = 0.0 + var j = 0 + while (j < points.length && cumulativeScore < r) { + cumulativeScore += weights(j) * costArray(j) + j += 1 + } + if (j == 0) { + logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + + s" Using duplicate point for center k = $i.") + centers(i) = points(0).toDense + } else { + centers(i) = points(j - 1).toDense + } + + // update costArray + for (p <- points.indices) { + costArray(p) = math.min( + EuclideanDistanceMeasure.fastSquaredDistance(points(p), centers(i)), + costArray(p)) + } + + } + + val distanceMeasureInstance = new EuclideanDistanceMeasure + + // Run up to maxIterations iterations of Lloyd's algorithm + val oldClosest = Array.fill(points.length)(-1) + var iteration = 0 + var moved = true + while (moved && iteration < maxIterations) { + moved = false + val counts = Array.fill(k)(0.0) + val sums = Array.fill(k)(Vectors.zeros(dimensions)) + var i = 0 + while (i < points.length) { + val p = points(i) + val index = distanceMeasureInstance.findClosest(centers, p)._1 + axpy(weights(i), p.vector, sums(index)) + counts(index) += weights(i) + if (index != oldClosest(i)) { + moved = true + oldClosest(i) = index + } + i += 1 + } + // Update centers + var j = 0 + while (j < k) { + if (counts(j) == 0.0) { + // Assign center to a random point + centers(j) = points(rand.nextInt(points.length)).toDense + } else { + scal(1.0 / counts(j), sums(j)) + centers(j) = new VectorWithNorm(sums(j)) + } + j += 1 + } + iteration += 1 + } + + if (iteration == maxIterations) { + logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.") + } else { + logInfo(s"Local KMeans++ converged in $iteration iterations.") + } + + centers + } + + private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = { + val r = rand.nextDouble() * weights.sum + var i = 0 + var curWeight = 0.0 + while (i < data.length && curWeight < r) { + curWeight += weights(i) + i += 1 + } + data(i - 1) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala index 9016940023f74..9f7efe2a9e605 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.util -import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{DenseMatrix, Vector, Vectors, VectorUDT} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, Dataset, Row} @@ -69,4 +69,35 @@ private[spark] object DatasetUtils { case Row(point: Vector) => OldVectors.fromML(point) } } + + def VectorsToDenseMatrix(rows: Iterator[Row]): DenseMatrix = { + + val vector_array = rows.map { + case Row(point: Vector) => point + }.toArray + + val column_num = vector_array(0).size + val row_num = vector_array.length + + val values = new Array[Double](row_num * column_num) + var rowIndex = 0 + + // convert to column-major dense matrix + for (vector <- vector_array) { + for ((value, index) <- vector.toArray.zipWithIndex) { + values(index*row_num + rowIndex) = value + } + rowIndex = rowIndex + 1 + } + + new DenseMatrix(row_num, column_num, values) + } + + def columnToDenseMatrix(dataset: Dataset[_], colName: String, rowNum: Int): RDD[DenseMatrix] = { + dataset.select(columnToVector(dataset, colName)) + .rdd.mapPartitions { p => + val groups = p.grouped(rowNum) + groups.map(s => VectorsToDenseMatrix(s.toIterator)) + } + } } From 16285a253cd18bdf3df34ab94e97bebfa0f1769b Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Thu, 7 May 2020 19:43:13 +0800 Subject: [PATCH 02/10] Rebase to master and add KMeansBlocksImpl --- .../spark/ml/clustering/DistanceMeasure.scala | 353 ------------------ .../apache/spark/ml/clustering/KMeans.scala | 109 +++--- .../spark/ml/clustering/LocalKMeans.scala | 133 ------- .../apache/spark/ml/util/DatasetUtils.scala | 33 +- .../clustering/KMeansBlocksImpl.scala} | 148 ++++---- 5 files changed, 126 insertions(+), 650 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala rename mllib/src/main/scala/org/apache/spark/{ml/clustering/KMeansMatrixImpl.scala => mllib/clustering/KMeansBlocksImpl.scala} (80%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala deleted file mode 100644 index 01fa0c2bd2759..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/DistanceMeasure.scala +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.clustering - -import org.apache.spark.annotation.Since -import org.apache.spark.ml.linalg.{DenseVector, Vector, Vectors} -import org.apache.spark.ml.linalg.BLAS.{axpy, dot, scal} - -private[spark] abstract class DistanceMeasure extends Serializable { - - /** - * @return the index of the closest center to the given point, as well as the cost. - */ - def findClosest( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - val currentDistance = distance(center, point) - if (currentDistance < bestDistance) { - bestDistance = currentDistance - bestIndex = i - } - i += 1 - } - (bestIndex, bestDistance) - } - - /** - * @return the K-means cost of a given point against the given cluster centers. - */ - def pointCost( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): Double = { - findClosest(centers, point)._2 - } - - /** - * @return whether a center converged or not, given the epsilon parameter. - */ - def isCenterConverged( - oldCenter: VectorWithNorm, - newCenter: VectorWithNorm, - epsilon: Double): Boolean = { - distance(oldCenter, newCenter) <= epsilon - } - - /** - * @return the distance between two points. - */ - def distance( - v1: VectorWithNorm, - v2: VectorWithNorm): Double - - /** - * @return the total cost of the cluster from its aggregated properties - */ - def clusterCost( - centroid: VectorWithNorm, - pointsSum: VectorWithNorm, - numberOfPoints: Long, - pointsSquaredNorm: Double): Double - - /** - * Updates the value of `sum` adding the `point` vector. - * @param point a `VectorWithNorm` to be added to `sum` of a cluster - * @param sum the `sum` for a cluster to be updated - */ - def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { - axpy(1.0, point.vector, sum) - } - - /** - * Returns a centroid for a cluster given its `sum` vector and its `count` of points. - * - * @param sum the `sum` for a cluster - * @param count the number of points in the cluster - * @return the centroid of the cluster - */ - def centroid(sum: Vector, count: Long): VectorWithNorm = { - scal(1.0 / count, sum) - new VectorWithNorm(sum) - } - - /** - * Returns two new centroids symmetric to the specified centroid applying `noise` with the - * with the specified `level`. - * - * @param level the level of `noise` to apply to the given centroid. - * @param noise a noise vector - * @param centroid the parent centroid - * @return a left and right centroid symmetric to `centroid` - */ - def symmetricCentroids( - level: Double, - noise: Vector, - centroid: Vector): (VectorWithNorm, VectorWithNorm) = { - val left = centroid.copy - axpy(-level, noise, left) - val right = centroid.copy - axpy(level, noise, right) - (new VectorWithNorm(left), new VectorWithNorm(right)) - } - - /** - * @return the cost of a point to be assigned to the cluster centroid - */ - def cost( - point: VectorWithNorm, - centroid: VectorWithNorm): Double = distance(point, centroid) -} - -@Since("2.4.0") -object DistanceMeasure { - - @Since("2.4.0") - val EUCLIDEAN = "euclidean" - @Since("2.4.0") - val COSINE = "cosine" - - private[spark] def decodeFromString(distanceMeasure: String): DistanceMeasure = - distanceMeasure match { - case EUCLIDEAN => new EuclideanDistanceMeasure - case COSINE => new CosineDistanceMeasure - case _ => throw new IllegalArgumentException(s"distanceMeasure must be one of: " + - s"$EUCLIDEAN, $COSINE. $distanceMeasure provided.") - } - - private[spark] def validateDistanceMeasure(distanceMeasure: String): Boolean = { - distanceMeasure match { - case DistanceMeasure.EUCLIDEAN => true - case DistanceMeasure.COSINE => true - case _ => false - } - } -} - -private[spark] class EuclideanDistanceMeasure extends DistanceMeasure { - /** - * @return the index of the closest center to the given point, as well as the squared distance. - */ - override def findClosest( - centers: TraversableOnce[VectorWithNorm], - point: VectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary - // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = EuclideanDistanceMeasure.fastSquaredDistance(center, point) - if (distance < bestDistance) { - bestDistance = distance - bestIndex = i - } - } - i += 1 - } - (bestIndex, bestDistance) - } - - /** - * @return whether a center converged or not, given the epsilon parameter. - */ - override def isCenterConverged( - oldCenter: VectorWithNorm, - newCenter: VectorWithNorm, - epsilon: Double): Boolean = { - EuclideanDistanceMeasure.fastSquaredDistance(newCenter, oldCenter) <= epsilon * epsilon - } - - /** - * @param v1: first vector - * @param v2: second vector - * @return the Euclidean distance between the two input vectors - */ - override def distance(v1: VectorWithNorm, v2: VectorWithNorm): Double = { - Math.sqrt(EuclideanDistanceMeasure.fastSquaredDistance(v1, v2)) - } - - /** - * @return the total cost of the cluster from its aggregated properties - */ - override def clusterCost( - centroid: VectorWithNorm, - pointsSum: VectorWithNorm, - numberOfPoints: Long, - pointsSquaredNorm: Double): Double = { - math.max(pointsSquaredNorm - numberOfPoints * centroid.norm * centroid.norm, 0.0) - } - - /** - * @return the cost of a point to be assigned to the cluster centroid - */ - override def cost( - point: VectorWithNorm, - centroid: VectorWithNorm): Double = { - EuclideanDistanceMeasure.fastSquaredDistance(point, centroid) - } -} - - -private[spark] object EuclideanDistanceMeasure { - private[ml] lazy val EPSILON = { - var eps = 1.0 - while ((1.0 + (eps / 2.0)) != 1.0) { - eps /= 2.0 - } - eps - } - - private[ml] def fastSquaredDistance( - v1: Vector, - norm1: Double, - v2: Vector, - norm2: Double, - precision: Double = 1e-6): Double = { - val n = v1.size - require(v2.size == n) - require(norm1 >= 0.0 && norm2 >= 0.0) - var sqDist = 0.0 - /* - * The relative error is - *
-     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
-     * 
- * which is bounded by - *
-     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
-     * 
- * The bound doesn't need the inner product, so we can use it as a sufficient condition to - * check quickly whether the inner product approach is accurate. - */ - if (v1.isInstanceOf[DenseVector] && v2.isInstanceOf[DenseVector]) { - sqDist = Vectors.sqdist(v1, v2) - } else { - val sumSquaredNorm = norm1 * norm1 + norm2 * norm2 - val normDiff = norm1 - norm2 - val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON) - if (precisionBound1 < precision) { - sqDist = sumSquaredNorm - 2.0 * dot(v1, v2) - } else { - val dotValue = dot(v1, v2) - sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0) - val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) / - (sqDist + EPSILON) - if (precisionBound2 > precision) { - sqDist = Vectors.sqdist(v1, v2) - } - } - } - sqDist - } - - /** - * @return the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ - private[clustering] def fastSquaredDistance( - v1: VectorWithNorm, - v2: VectorWithNorm): Double = { - fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) - } -} - -private[spark] class CosineDistanceMeasure extends DistanceMeasure { - /** - * @param v1: first vector - * @param v2: second vector - * @return the cosine distance between the two input vectors - */ - override def distance(v1: VectorWithNorm, v2: VectorWithNorm): Double = { - assert(v1.norm > 0 && v2.norm > 0, "Cosine distance is not defined for zero-length vectors.") - 1 - dot(v1.vector, v2.vector) / v1.norm / v2.norm - } - - /** - * Updates the value of `sum` adding the `point` vector. - * @param point a `VectorWithNorm` to be added to `sum` of a cluster - * @param sum the `sum` for a cluster to be updated - */ - override def updateClusterSum(point: VectorWithNorm, sum: Vector): Unit = { - assert(point.norm > 0, "Cosine distance is not defined for zero-length vectors.") - axpy(1.0 / point.norm, point.vector, sum) - } - - /** - * Returns a centroid for a cluster given its `sum` vector and its `count` of points. - * - * @param sum the `sum` for a cluster - * @param count the number of points in the cluster - * @return the centroid of the cluster - */ - override def centroid(sum: Vector, count: Long): VectorWithNorm = { - scal(1.0 / count, sum) - val norm = Vectors.norm(sum, 2) - scal(1.0 / norm, sum) - new VectorWithNorm(sum, 1) - } - - /** - * @return the total cost of the cluster from its aggregated properties - */ - override def clusterCost( - centroid: VectorWithNorm, - pointsSum: VectorWithNorm, - numberOfPoints: Long, - pointsSquaredNorm: Double): Double = { - val costVector = pointsSum.vector.copy - math.max(numberOfPoints - dot(centroid.vector, costVector) / centroid.norm, 0.0) - } - - /** - * Returns two new centroids symmetric to the specified centroid applying `noise` with the - * with the specified `level`. - * - * @param level the level of `noise` to apply to the given centroid. - * @param noise a noise vector - * @param centroid the parent centroid - * @return a left and right centroid symmetric to `centroid` - */ - override def symmetricCentroids( - level: Double, - noise: Vector, - centroid: Vector): (VectorWithNorm, VectorWithNorm) = { - val (left, right) = super.symmetricCentroids(level, noise, centroid) - val leftVector = left.vector - val rightVector = right.vector - scal(1.0 / left.norm, leftVector) - scal(1.0 / right.norm, rightVector) - (new VectorWithNorm(leftVector, 1.0), new VectorWithNorm(rightVector, 1.0)) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 321e9dfd758ed..358652da9d823 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -21,15 +21,15 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.SparkConf import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model, PipelineStage} +import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} +import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansBlocksImpl, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD @@ -43,7 +43,8 @@ import org.apache.spark.util.VersionUtils.majorVersion * Common params for KMeans and KMeansModel */ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol - with HasSeed with HasPredictionCol with HasTol with HasDistanceMeasure with HasWeightCol { + with HasSeed with HasPredictionCol with HasTol with HasDistanceMeasure with HasWeightCol + with HasBlockSize { /** * The number of clusters to create (k). Must be > 1. Note that it is possible for fewer than @@ -331,39 +332,59 @@ class KMeans @Since("1.5.0") ( @Since("3.0.0") def setWeightCol(value: String): this.type = set(weightCol, value) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + @Since("2.0.0") - override def fit(dataset: Dataset[_]): KMeansModel = { - val conf = new SparkConf(true) - val enableMatrixImpl = conf.getBoolean("spark.ml.kmeans.matrixImplementation.enabled", false) + override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr => + + transformSchema(dataset.schema, logging = true) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, + maxIter, seed, tol, weightCol, blockSize) - if (enableMatrixImpl && $(distanceMeasure) == "euclidean") { - val matrixRowNum = conf.getInt("spark.ml.kmeans.matrixImplementation.rowsPerMatrix", 1000) - fitMatrixImpl(dataset, matrixRowNum) + if ($(blockSize) == 1) { + trainOnRows(dataset) } else { - // Calling the old K-Means implementation - fitOld(dataset) + trainOnBlocks(dataset) } } - private def fitMatrixImpl(dataset: Dataset[_], - matrix_row_num: Int): KMeansModel = instrumented { instr => - - instr.logInfo(s"k-means matrix implementation enabled! rowsPerMatrix = ${matrix_row_num}") - - transformSchema(dataset.schema, logging = true) - + private def trainOnRows(dataset: Dataset[_]): KMeansModel = instrumented { instr => val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instances = DatasetUtils.columnToDenseMatrix(dataset, getFeaturesCol, matrix_row_num) + val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { + col($(weightCol)).cast(DoubleType) + } else { + lit(1.0) + } + + val instances: RDD[(OldVector, Double)] = dataset + .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { + case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + } if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) } - instr.logPipelineStage(this) - instr.logDataset(dataset) - instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, - maxIter, seed, tol) - val algo = new KMeansMatrixImpl() + val algo = new MLlibKMeans() .setK($(k)) .setInitializationMode($(initMode)) .setInitializationSteps($(initSteps)) @@ -371,7 +392,7 @@ class KMeans @Since("1.5.0") ( .setSeed($(seed)) .setEpsilon($(tol)) .setDistanceMeasure($(distanceMeasure)) - val parentModel = algo.run(instances, matrix_row_num, Option(instr)) + val parentModel = algo.runWithWeight(instances, Option(instr)) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) val summary = new KMeansSummary( model.transform(dataset), @@ -389,38 +410,28 @@ class KMeans @Since("1.5.0") ( model } - private def fitOld(dataset: Dataset[_]): KMeansModel = instrumented { instr => - transformSchema(dataset.schema, logging = true) + private def trainOnBlocks(dataset: Dataset[_]): KMeansModel = instrumented { instr => - val handlePersistence = dataset.storageLevel == StorageLevel.NONE val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { col($(weightCol)).cast(DoubleType) } else { lit(1.0) } - val instances: RDD[(OldVector, Double)] = dataset + val instances: RDD[Instance] = dataset .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { - case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + case Row(point: Vector, weight: Double) => Instance(0.0, weight, point) } - if (handlePersistence) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } + val blocks = InstanceBlock.blokify(instances, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val algo = new KMeansBlocksImpl($(k), $(maxIter), $(initMode), + $(initSteps), $(tol), $(seed), $(distanceMeasure)) + + val parentModel = algo.runAlgorithmWithWeight(blocks, Option(instr)) - instr.logPipelineStage(this) - instr.logDataset(dataset) - instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, - maxIter, seed, tol, weightCol) - val algo = new MLlibKMeans() - .setK($(k)) - .setInitializationMode($(initMode)) - .setInitializationSteps($(initSteps)) - .setMaxIterations($(maxIter)) - .setSeed($(seed)) - .setEpsilon($(tol)) - .setDistanceMeasure($(distanceMeasure)) - val parentModel = algo.runWithWeight(instances, Option(instr)) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) val summary = new KMeansSummary( model.transform(dataset), @@ -432,9 +443,9 @@ class KMeans @Since("1.5.0") ( model.setSummary(Some(summary)) instr.logNamedValue("clusterSizes", summary.clusterSizes) - if (handlePersistence) { - instances.unpersist() - } + + blocks.unpersist() + model } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala deleted file mode 100644 index 35183e4df4c8e..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LocalKMeans.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.clustering - -import scala.util.Random - -import org.apache.spark.internal.Logging -import org.apache.spark.ml.linalg.BLAS.{axpy, scal} -import org.apache.spark.ml.linalg.Vectors - -/** - * An utility object to run K-means locally. This is private to the ML package because it's used - * in the initialization of KMeans but not meant to be publicly exposed. - */ -private[ml] object LocalKMeans extends Logging { - - /** - * Run K-means++ on the weighted point set `points`. This first does the K-means++ - * initialization procedure and then rounds of Lloyd's algorithm. - */ - def kMeansPlusPlus( - seed: Int, - points: Array[VectorWithNorm], - weights: Array[Double], - k: Int, - maxIterations: Int - ): Array[VectorWithNorm] = { - val rand = new Random(seed) - val dimensions = points(0).vector.size - val centers = new Array[VectorWithNorm](k) - - // Initialize centers by sampling using the k-means++ procedure. - centers(0) = pickWeighted(rand, points, weights).toDense - val costArray = points.map(EuclideanDistanceMeasure.fastSquaredDistance(_, centers(0))) - - for (i <- 1 until k) { - val sum = costArray.zip(weights).map(p => p._1 * p._2).sum - val r = rand.nextDouble() * sum - var cumulativeScore = 0.0 - var j = 0 - while (j < points.length && cumulativeScore < r) { - cumulativeScore += weights(j) * costArray(j) - j += 1 - } - if (j == 0) { - logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + - s" Using duplicate point for center k = $i.") - centers(i) = points(0).toDense - } else { - centers(i) = points(j - 1).toDense - } - - // update costArray - for (p <- points.indices) { - costArray(p) = math.min( - EuclideanDistanceMeasure.fastSquaredDistance(points(p), centers(i)), - costArray(p)) - } - - } - - val distanceMeasureInstance = new EuclideanDistanceMeasure - - // Run up to maxIterations iterations of Lloyd's algorithm - val oldClosest = Array.fill(points.length)(-1) - var iteration = 0 - var moved = true - while (moved && iteration < maxIterations) { - moved = false - val counts = Array.fill(k)(0.0) - val sums = Array.fill(k)(Vectors.zeros(dimensions)) - var i = 0 - while (i < points.length) { - val p = points(i) - val index = distanceMeasureInstance.findClosest(centers, p)._1 - axpy(weights(i), p.vector, sums(index)) - counts(index) += weights(i) - if (index != oldClosest(i)) { - moved = true - oldClosest(i) = index - } - i += 1 - } - // Update centers - var j = 0 - while (j < k) { - if (counts(j) == 0.0) { - // Assign center to a random point - centers(j) = points(rand.nextInt(points.length)).toDense - } else { - scal(1.0 / counts(j), sums(j)) - centers(j) = new VectorWithNorm(sums(j)) - } - j += 1 - } - iteration += 1 - } - - if (iteration == maxIterations) { - logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"Local KMeans++ converged in $iteration iterations.") - } - - centers - } - - private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = { - val r = rand.nextDouble() * weights.sum - var i = 0 - var curWeight = 0.0 - while (i < data.length && curWeight < r) { - curWeight += weights(i) - i += 1 - } - data(i - 1) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala index 9f7efe2a9e605..9016940023f74 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.util -import org.apache.spark.ml.linalg.{DenseMatrix, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Column, Dataset, Row} @@ -69,35 +69,4 @@ private[spark] object DatasetUtils { case Row(point: Vector) => OldVectors.fromML(point) } } - - def VectorsToDenseMatrix(rows: Iterator[Row]): DenseMatrix = { - - val vector_array = rows.map { - case Row(point: Vector) => point - }.toArray - - val column_num = vector_array(0).size - val row_num = vector_array.length - - val values = new Array[Double](row_num * column_num) - var rowIndex = 0 - - // convert to column-major dense matrix - for (vector <- vector_array) { - for ((value, index) <- vector.toArray.zipWithIndex) { - values(index*row_num + rowIndex) = value - } - rowIndex = rowIndex + 1 - } - - new DenseMatrix(row_num, column_num, values) - } - - def columnToDenseMatrix(dataset: Dataset[_], colName: String, rowNum: Int): RDD[DenseMatrix] = { - dataset.select(columnToVector(dataset, colName)) - .rdd.mapPartitions { p => - val groups = p.grouped(rowNum) - groups.map(s => VectorsToDenseMatrix(s.toIterator)) - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala similarity index 80% rename from mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala rename to mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala index f92b7d49f9299..f9abe644f4b4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansMatrixImpl.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala @@ -15,29 +15,26 @@ * limitations under the License. */ -package org.apache.spark.ml.clustering +package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.ml.linalg.{BLAS, DenseMatrix, Vector, Vectors} -import org.apache.spark.ml.linalg.BLAS.axpy +import org.apache.spark.ml.feature.InstanceBlock import org.apache.spark.ml.util.Instrumentation -import org.apache.spark.mllib.clustering.{KMeansModel => MLlibKMeansModel} -import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, Vector, Vectors} +import org.apache.spark.mllib.linalg.BLAS.axpy import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom - /** - * K-means clustering using RDD[DenseMatrix] as input format. + * K-means clustering using RDD[InstanceBlock] as input format. */ -@Since("0.8.0") -class KMeansMatrixImpl private( +class KMeansBlocksImpl ( private var k: Int, private var maxIterations: Int, private var initializationMode: String, @@ -46,19 +43,12 @@ class KMeansMatrixImpl private( private var seed: Long, private var distanceMeasure: String) extends Serializable with Logging { - @Since("0.8.0") - private def this(k: Int, maxIterations: Int, initializationMode: String, initializationSteps: Int, - epsilon: Double, seed: Long) = - this(k, maxIterations, initializationMode, initializationSteps, - epsilon, seed, DistanceMeasure.EUCLIDEAN) - /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random, * distanceMeasure: "euclidean"}. */ - @Since("0.8.0") - def this() = this(2, 20, KMeansMatrixImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), + def this() = this(2, 20, KMeansBlocksImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), DistanceMeasure.EUCLIDEAN) /** @@ -115,7 +105,7 @@ class KMeansMatrixImpl private( */ @Since("0.8.0") def setInitializationMode(initializationMode: String): this.type = { - KMeansMatrixImpl.validateInitMode(initializationMode) + KMeansBlocksImpl.validateInitMode(initializationMode) this.initializationMode = initializationMode this } @@ -189,8 +179,9 @@ class KMeansMatrixImpl private( // Initial cluster centers can be provided as a KMeansModel object rather than using the // random or k-means|| initializationMode - private var initialModel: Option[MLlibKMeansModel] = None + private var initialModel: Option[KMeansModel] = None +/* /** * Set the initial starting point, bypassing the random initialization or k-means|| * The condition model.k == this.k must be met, failure results @@ -202,15 +193,15 @@ class KMeansMatrixImpl private( initialModel = Some(model) this } - - private[spark] def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { +*/ + private def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { val v: Array[Vector] = vectors.map(_.vector) VectorsToDenseMatrix(v.toIterator) } - private[spark] def VectorsToDenseMatrix(vectors: Iterator[Vector]): DenseMatrix = { + private def VectorsToDenseMatrix(vectors: Iterator[Vector]): DenseMatrix = { val vector_array = vectors.toArray val column_num = vector_array(0).size val row_num = vector_array.length @@ -229,22 +220,22 @@ class KMeansMatrixImpl private( new DenseMatrix(row_num, column_num, values) } - private[spark] def run(data: RDD[DenseMatrix], row_num: Int, - instr: Option[Instrumentation]): MLlibKMeansModel = { - - // Cache RDD data - data.cache.count() - - val model = runAlgorithm(data, instr) - - // Warn at the end of the run as well, for increased visibility. - if (data.getStorageLevel == StorageLevel.NONE) { - logWarning("The input data was not directly cached, which may hurt performance if its" - + " parent RDDs are also uncached.") - } - model - - } +// private[spark] def runWithWeight(data: RDD[InstanceBlock], row_num: Int, +// instr: Option[Instrumentation]): MLlibKMeansModel = { +// +// // Cache RDD data +// data.cache.count() +// +// val model = runAlgorithmWithWeight(data, instr) +// +// // Warn at the end of the run as well, for increased visibility. +// if (data.getStorageLevel == StorageLevel.NONE) { +// logWarning("The input data was not directly cached, which may hurt performance if its" +// + " parent RDDs are also uncached.") +// } +// model +// +// } private def computeSquaredDistances(points_matrix: DenseMatrix, points_square_sums: DenseMatrix, @@ -298,7 +289,8 @@ class KMeansMatrixImpl private( matrix2 } - private def findClosest(distances: DenseMatrix): (Array[Int], Array[Double]) = { + private def findClosest(distances: DenseMatrix, + weights: Array[Double]): (Array[Int], Array[Double]) = { val points_num = distances.numRows val ret_closest = new Array[Int](points_num) val ret_cost = new Array[Double](points_num) @@ -313,16 +305,17 @@ class KMeansMatrixImpl private( } } ret_closest(index) = closest - // mllib use squared distance as cost, not sqrt - ret_cost(index) = cost + + // use weighted squared distance as cost + ret_cost(index) = cost * weights(index) } (ret_closest, ret_cost) } - private def runAlgorithm(data: RDD[DenseMatrix], - instr: Option[Instrumentation]): MLlibKMeansModel = { + def runAlgorithmWithWeight(data: RDD[InstanceBlock], + instr: Option[Instrumentation]): KMeansModel = { val sc = data.sparkContext @@ -330,7 +323,7 @@ class KMeansMatrixImpl private( val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure) - val centers = if (initializationMode == KMeansMatrixImpl.RANDOM) { + val centers = if (initializationMode == KMeansBlocksImpl.RANDOM) { initRandom(data) } else { initKMeansParallel(data, distanceMeasureInstance) @@ -359,15 +352,16 @@ class KMeansMatrixImpl private( val centers_dim = centers_matrix.numCols // Compute squared sums for points - val data_square_sums: RDD[DenseMatrix] = data.mapPartitions { points_matrices => - points_matrices.map { points_matrix => computePointsSquareSum(points_matrix, centers_num) } + val data_square_sums: RDD[DenseMatrix] = data.mapPartitions { blocks => + blocks.map { block => + computePointsSquareSum(DenseMatrix.fromML(block.matrix.toDense), centers_num) } } // Find the new centers val collected = data.zip(data_square_sums).flatMap { - case (points_matrix, points_square_sums) => + case (blocks, points_square_sums) => val centers_matrix = bcCenters.value - val points_num = points_matrix.numRows + val points_num = blocks.matrix.numRows val sums = Array.fill(centers_num)(Vectors.zeros(centers_dim)) val counts = Array.fill(centers_num)(0L) @@ -377,18 +371,18 @@ class KMeansMatrixImpl private( // Compute squared distances val distances = computeSquaredDistances( - points_matrix, points_square_sums, + DenseMatrix.fromML(blocks.matrix.toDense), points_square_sums, centers_matrix, centers_square_sums) - val (bestCenters, costs) = findClosest(distances) + val (bestCenters, weightedCosts) = findClosest(distances, blocks.weights) - for (cost <- costs) + for (cost <- weightedCosts) costAccum.add(cost) // sums points around best center - for ((row, index) <- points_matrix.rowIter.zipWithIndex) { + for ((row, index) <- blocks.matrix.rowIter.zipWithIndex) { val bestCenter = bestCenters(index) - axpy(1.0, row, sums(bestCenter)) + axpy(blocks.weights(index), Vectors.fromML(row), sums(bestCenter)) counts(bestCenter) += 1 } @@ -433,15 +427,18 @@ class KMeansMatrixImpl private( logInfo(s"The cost is $cost.") - new MLlibKMeansModel(centers.map { c => OldVectors.fromML(c.vector) }, + new KMeansModel(centers.map { c => c.vector }, distanceMeasure, cost, iteration) } /** * Initialize a set of cluster centers at random. */ - private def initRandom(data: RDD[DenseMatrix]): Array[VectorWithNorm] = { - val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.rowIter).map(new VectorWithNorm(_)) + private def initRandom(data: RDD[InstanceBlock]): Array[VectorWithNorm] = { + + val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) + // Select without replacement; may still produce duplicates if the data has < k distinct // points, so deduplicate the centroids to match the behavior of k-means|| in the same situation @@ -459,10 +456,11 @@ class KMeansMatrixImpl private( * * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. */ - private[clustering] def initKMeansParallel(rdd_matrix: RDD[DenseMatrix], - distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = { + private[clustering] def initKMeansParallel(rdd_matrix: RDD[InstanceBlock], + distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = { - val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.rowIter).map(new VectorWithNorm(_)) + val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) // Initialize empty centers and point costs. var costs = data.map(_ => Double.PositiveInfinity) @@ -474,7 +472,7 @@ class KMeansMatrixImpl private( require(sample.nonEmpty, s"No samples available from $data") val centers = ArrayBuffer[VectorWithNorm]() - var newCenters = Seq(sample.head.toDense) + var newCenters = Array(sample.head.toDense) centers ++= newCenters // On each step, sample 2 * k points on average with probability proportional @@ -506,10 +504,10 @@ class KMeansMatrixImpl private( costs.unpersist() bcNewCentersList.foreach(_.destroy()) - val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)) + val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)).toArray - if (distinctCenters.size <= k) { - distinctCenters.toArray + if (distinctCenters.length <= k) { + distinctCenters } else { // Finally, we might have a set of more than k distinct candidate centers; weight each // candidate by the number of points in the dataset mapping to it and run a local k-means++ @@ -522,7 +520,7 @@ class KMeansMatrixImpl private( bcCenters.destroy() val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray - LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30) + LocalKMeans.kMeansPlusPlus(0, distinctCenters, myWeights, k, 30) } } } @@ -531,34 +529,18 @@ class KMeansMatrixImpl private( /** * Top-level methods for calling K-means clustering. */ -@Since("0.8.0") -object KMeansMatrixImpl { +object KMeansBlocksImpl { // Initialization mode names - @Since("0.8.0") val RANDOM = "random" - @Since("0.8.0") val K_MEANS_PARALLEL = "k-means||" private[spark] def validateInitMode(initMode: String): Boolean = { initMode match { - case KMeansMatrixImpl.RANDOM => true - case KMeansMatrixImpl.K_MEANS_PARALLEL => true + case KMeansBlocksImpl.RANDOM => true + case KMeansBlocksImpl.K_MEANS_PARALLEL => true case _ => false } } } -/** - * A vector with its norm for fast distance computation. - */ -private[ml] class VectorWithNorm(val vector: Vector, val norm: Double) - extends Serializable { - - def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) - - def this(array: Array[Double]) = this(Vectors.dense(array)) - - /** Converts the vector to a dense vector. */ - def toDense: VectorWithNorm = new VectorWithNorm(Vectors.dense(vector.toArray), norm) -} From 6655ed48d024af7344dfb7a7ccee240665d1db37 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Sun, 10 May 2020 14:34:11 +0800 Subject: [PATCH 03/10] Add test case, add weight.nonEmpty condition when weights are all 1 --- .../apache/spark/ml/clustering/KMeans.scala | 7 +- .../mllib/clustering/KMeansBlocksImpl.scala | 175 ++---------------- .../spark/ml/clustering/KMeansSuite.scala | 14 ++ 3 files changed, 31 insertions(+), 165 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 358652da9d823..38f86bc8a5e67 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -278,7 +278,9 @@ class KMeans @Since("1.5.0") ( initMode -> MLlibKMeans.K_MEANS_PARALLEL, initSteps -> 2, tol -> 1e-4, - distanceMeasure -> DistanceMeasure.EUCLIDEAN) + distanceMeasure -> DistanceMeasure.EUCLIDEAN, + blockSize -> 1 + ) @Since("1.5.0") override def copy(extra: ParamMap): KMeans = defaultCopy(extra) @@ -411,7 +413,6 @@ class KMeans @Since("1.5.0") ( } private def trainOnBlocks(dataset: Dataset[_]): KMeansModel = instrumented { instr => - val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { col($(weightCol)).cast(DoubleType) } else { @@ -430,7 +431,7 @@ class KMeans @Since("1.5.0") ( val algo = new KMeansBlocksImpl($(k), $(maxIter), $(initMode), $(initSteps), $(tol), $(seed), $(distanceMeasure)) - val parentModel = algo.runAlgorithmWithWeight(blocks, Option(instr)) + val parentModel = algo.runWithWeight(blocks, Option(instr)) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) val summary = new KMeansSummary( diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala index f9abe644f4b4e..71387f621871b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala @@ -19,7 +19,6 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.InstanceBlock @@ -51,149 +50,10 @@ class KMeansBlocksImpl ( def this() = this(2, 20, KMeansBlocksImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), DistanceMeasure.EUCLIDEAN) - /** - * Number of clusters to create (k). - * - * @note It is possible for fewer than k clusters to - * be returned, for example, if there are fewer than k distinct points to cluster. - */ - @Since("1.4.0") - def getK: Int = k - - /** - * Set the number of clusters to create (k). - * - * @note It is possible for fewer than k clusters to - * be returned, for example, if there are fewer than k distinct points to cluster. - * Default: 2. - */ - @Since("0.8.0") - def setK(k: Int): this.type = { - require(k > 0, - s"Number of clusters must be positive but got ${k}") - this.k = k - this - } - - /** - * Maximum number of iterations allowed. - */ - @Since("1.4.0") - def getMaxIterations: Int = maxIterations - - /** - * Set maximum number of iterations allowed. Default: 20. - */ - @Since("0.8.0") - def setMaxIterations(maxIterations: Int): this.type = { - require(maxIterations >= 0, - s"Maximum of iterations must be nonnegative but got ${maxIterations}") - this.maxIterations = maxIterations - this - } - - /** - * The initialization algorithm. This can be either "random" or "k-means||". - */ - @Since("1.4.0") - def getInitializationMode: String = initializationMode - - /** - * Set the initialization algorithm. This can be either "random" to choose random points as - * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ - * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. - */ - @Since("0.8.0") - def setInitializationMode(initializationMode: String): this.type = { - KMeansBlocksImpl.validateInitMode(initializationMode) - this.initializationMode = initializationMode - this - } - - /** - * Number of steps for the k-means|| initialization mode - */ - @Since("1.4.0") - def getInitializationSteps: Int = initializationSteps - - /** - * Set the number of steps for the k-means|| initialization mode. This is an advanced - * setting -- the default of 2 is almost always enough. Default: 2. - */ - @Since("0.8.0") - def setInitializationSteps(initializationSteps: Int): this.type = { - require(initializationSteps > 0, - s"Number of initialization steps must be positive but got ${initializationSteps}") - this.initializationSteps = initializationSteps - this - } - - /** - * The distance threshold within which we've consider centers to have converged. - */ - @Since("1.4.0") - def getEpsilon: Double = epsilon - - /** - * Set the distance threshold within which we've consider centers to have converged. - * If all centers move less than this Euclidean distance, we stop iterating one run. - */ - @Since("0.8.0") - def setEpsilon(epsilon: Double): this.type = { - require(epsilon >= 0, - s"Distance threshold must be nonnegative but got ${epsilon}") - this.epsilon = epsilon - this - } - - /** - * The random seed for cluster initialization. - */ - @Since("1.4.0") - def getSeed: Long = seed - - /** - * Set the random seed for cluster initialization. - */ - @Since("1.4.0") - def setSeed(seed: Long): this.type = { - this.seed = seed - this - } - - /** - * The distance suite used by the algorithm. - */ - @Since("2.4.0") - def getDistanceMeasure: String = distanceMeasure - - /** - * Set the distance suite used by the algorithm. - */ - @Since("2.4.0") - def setDistanceMeasure(distanceMeasure: String): this.type = { - DistanceMeasure.validateDistanceMeasure(distanceMeasure) - this.distanceMeasure = distanceMeasure - this - } - // Initial cluster centers can be provided as a KMeansModel object rather than using the // random or k-means|| initializationMode private var initialModel: Option[KMeansModel] = None -/* - /** - * Set the initial starting point, bypassing the random initialization or k-means|| - * The condition model.k == this.k must be met, failure results - * in an IllegalArgumentException. - */ - @Since("1.4.0") - def setInitialModel(model: MLlibKMeansModel): this.type = { - require(model.k == k, "mismatched cluster count") - initialModel = Some(model) - this - } -*/ private def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { val v: Array[Vector] = vectors.map(_.vector) @@ -220,28 +80,11 @@ class KMeansBlocksImpl ( new DenseMatrix(row_num, column_num, values) } -// private[spark] def runWithWeight(data: RDD[InstanceBlock], row_num: Int, -// instr: Option[Instrumentation]): MLlibKMeansModel = { -// -// // Cache RDD data -// data.cache.count() -// -// val model = runAlgorithmWithWeight(data, instr) -// -// // Warn at the end of the run as well, for increased visibility. -// if (data.getStorageLevel == StorageLevel.NONE) { -// logWarning("The input data was not directly cached, which may hurt performance if its" -// + " parent RDDs are also uncached.") -// } -// model -// -// } - private def computeSquaredDistances(points_matrix: DenseMatrix, points_square_sums: DenseMatrix, centers_matrix: DenseMatrix, centers_square_sums: DenseMatrix): DenseMatrix = { - // (x + y)^2 = x^2 + y^2 + 2 * x * y + // (x - y)^2 = x^2 + y^2 - 2 * x * y // Add up squared sums of points and centers (x^2 + y^2) val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) @@ -307,15 +150,19 @@ class KMeansBlocksImpl ( ret_closest(index) = closest // use weighted squared distance as cost - ret_cost(index) = cost * weights(index) + if (weights.nonEmpty) { + ret_cost(index) = cost * weights(index) + } else { + ret_cost(index) = cost + } } (ret_closest, ret_cost) } - def runAlgorithmWithWeight(data: RDD[InstanceBlock], - instr: Option[Instrumentation]): KMeansModel = { + def runWithWeight(data: RDD[InstanceBlock], + instr: Option[Instrumentation]): KMeansModel = { val sc = data.sparkContext @@ -382,7 +229,11 @@ class KMeansBlocksImpl ( // sums points around best center for ((row, index) <- blocks.matrix.rowIter.zipWithIndex) { val bestCenter = bestCenters(index) - axpy(blocks.weights(index), Vectors.fromML(row), sums(bestCenter)) + if (blocks.weights.nonEmpty) { + axpy(blocks.weights(index), Vectors.fromML(row), sums(bestCenter)) + } else { + axpy(1, Vectors.fromML(row), sums(bestCenter)) + } counts(bestCenter) += 1 } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 584594436267f..c34e51b658e2c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -58,6 +58,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes assert(kmeans.getInitSteps === 2) assert(kmeans.getTol === 1e-4) assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN) + assert(kmeans.getBlockSize == 1) val model = kmeans.setMaxIter(1).fit(dataset) val transformed = model.transform(dataset) @@ -69,6 +70,19 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes assert(copiedModel.hasSummary) } + test("default parameters on blocks") { + val kmeans = new KMeans() + val model = kmeans.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = kmeans.setBlockSize(blockSize).fit(dataset) + + model.clusterCenters.zip(model2.clusterCenters).map { + case (v1, v2) => assert(v1 ~== v2 relTol 1e-9) + } + assert(model.summary.trainingCost ~== model2.summary.trainingCost relTol 1e-9) + } + } + test("set parameters") { val kmeans = new KMeans() .setK(9) From 5c3fb7e1ba43b23a6312753def1dd4cd2770e2a4 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Fri, 22 May 2020 16:37:09 +0800 Subject: [PATCH 04/10] Improve fit impl, moved KMeansBlocksImpl to ml and integrating WIP --- .../apache/spark/ml/clustering/KMeans.scala | 109 ++++++++---------- .../clustering/KMeansBlocksImpl.scala | 8 +- 2 files changed, 52 insertions(+), 65 deletions(-) rename mllib/src/main/scala/org/apache/spark/{mllib => ml}/clustering/KMeansBlocksImpl.scala (98%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 38f86bc8a5e67..cf75b13738d67 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -29,7 +29,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansBlocksImpl, KMeansModel => MLlibKMeansModel} +import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD @@ -355,21 +355,8 @@ class KMeans @Since("1.5.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr => - transformSchema(dataset.schema, logging = true) - instr.logPipelineStage(this) - instr.logDataset(dataset) - instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, - maxIter, seed, tol, weightCol, blockSize) - if ($(blockSize) == 1) { - trainOnRows(dataset) - } else { - trainOnBlocks(dataset) - } - } - - private def trainOnRows(dataset: Dataset[_]): KMeansModel = instrumented { instr => val handlePersistence = dataset.storageLevel == StorageLevel.NONE val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { col($(weightCol)).cast(DoubleType) @@ -377,77 +364,75 @@ class KMeans @Since("1.5.0") ( lit(1.0) } - val instances: RDD[(OldVector, Double)] = dataset + val instances: RDD[(Vector, Double)] = dataset .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { - case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + case Row(point: Vector, weight: Double) => (point, weight) } if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) } - val algo = new MLlibKMeans() - .setK($(k)) - .setInitializationMode($(initMode)) - .setInitializationSteps($(initSteps)) - .setMaxIterations($(maxIter)) - .setSeed($(seed)) - .setEpsilon($(tol)) - .setDistanceMeasure($(distanceMeasure)) - val parentModel = algo.runWithWeight(instances, Option(instr)) - val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + instr.logPipelineStage(this) + instr.logDataset(dataset) + instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, + maxIter, seed, tol, weightCol, blockSize) + + val model = if ($(blockSize) == 1) { + trainOnRows(instances) + } else { + trainOnBlocks(instances) + } + val summary = new KMeansSummary( model.transform(dataset), $(predictionCol), $(featuresCol), $(k), - parentModel.numIter, - parentModel.trainingCost) - + model.parentModel.numIter, + model.parentModel.trainingCost) model.setSummary(Some(summary)) - instr.logNamedValue("clusterSizes", summary.clusterSizes) + + instr.logNamedValue("clusterSizes", model.summary.clusterSizes) if (handlePersistence) { instances.unpersist() } model } - private def trainOnBlocks(dataset: Dataset[_]): KMeansModel = instrumented { instr => - val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) { - col($(weightCol)).cast(DoubleType) - } else { - lit(1.0) - } - - val instances: RDD[Instance] = dataset - .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { - case Row(point: Vector, weight: Double) => Instance(0.0, weight, point) - } - - val blocks = InstanceBlock.blokify(instances, $(blockSize)) - .persist(StorageLevel.MEMORY_AND_DISK) - .setName(s"training dataset (blockSize=${$(blockSize)})") - - val algo = new KMeansBlocksImpl($(k), $(maxIter), $(initMode), - $(initSteps), $(tol), $(seed), $(distanceMeasure)) - - val parentModel = algo.runWithWeight(blocks, Option(instr)) + private def trainOnRows(instances: RDD[(Vector, Double)]): KMeansModel = + instrumented { instr => + val oldVectorInstances = instances.map { + case (point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + } + val algo = new MLlibKMeans() + .setK($(k)) + .setInitializationMode($(initMode)) + .setInitializationSteps($(initSteps)) + .setMaxIterations($(maxIter)) + .setSeed($(seed)) + .setEpsilon($(tol)) + .setDistanceMeasure($(distanceMeasure)) + val parentModel = algo.runWithWeight(oldVectorInstances, Option(instr)) + val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + model + } - val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) - val summary = new KMeansSummary( - model.transform(dataset), - $(predictionCol), - $(featuresCol), - $(k), - parentModel.numIter, - parentModel.trainingCost) + private def trainOnBlocks(instances: RDD[(Vector, Double)]): KMeansModel = + instrumented { instr => + val instanceRDD: RDD[Instance] = instances.map { + case (point: Vector, weight: Double) => Instance(0.0, weight, point) + } - model.setSummary(Some(summary)) - instr.logNamedValue("clusterSizes", summary.clusterSizes) + val blocks = InstanceBlock.blokify(instanceRDD, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") - blocks.unpersist() + val algo = new KMeansBlocksImpl($(k), $(maxIter), $(initMode), + $(initSteps), $(tol), $(seed), $(distanceMeasure)) - model + val model = algo.runWithWeight(blocks, Option(instr)) + model } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala similarity index 98% rename from mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala rename to mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala index 71387f621871b..96bc56b3a2045 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala @@ -15,16 +15,18 @@ * limitations under the License. */ -package org.apache.spark.mllib.clustering +package org.apache.spark.ml.clustering import scala.collection.mutable.ArrayBuffer import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.InstanceBlock +import org.apache.spark.ml.linalg.{DenseMatrix, Vector, Vectors} +import org.apache.spark.ml.linalg.BLAS +import org.apache.spark.ml.linalg.BLAS.axpy import org.apache.spark.ml.util.Instrumentation -import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, Vector, Vectors} -import org.apache.spark.mllib.linalg.BLAS.axpy +import org.apache.spark.mllib.clustering.DistanceMeasure import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils From 73a37b4668e35adbe748f57887158a9ab162f146 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Fri, 22 May 2020 18:46:03 +0800 Subject: [PATCH 05/10] change VectorWithNorm to public, add initBlocksRandom and initBlocksKMeansParallel --- .../spark/mllib/clustering/KMeans.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 1c5de5a092d6e..14adb3d5796ad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.InstanceBlock +import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.ml.util.Instrumentation import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.axpy @@ -349,6 +351,22 @@ class KMeans private ( new KMeansModel(centers.map(_.vector), distanceMeasure, cost, iteration) } + def initBlocksRandom(data: RDD[InstanceBlock]): Array[MLVector] = { + val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) + + initRandom(vectorWithNorms).map(_.vector.asML) + } + + def initBlocksKMeansParallel(rdd_matrix: RDD[InstanceBlock], + distanceMeasureInstance: DistanceMeasure): Array[MLVector] = { + + val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) + + initKMeansParallel(data, distanceMeasureInstance).map(_.vector.asML) + } + /** * Initialize a set of cluster centers at random. */ @@ -517,7 +535,7 @@ object KMeans { /** * A vector with its norm for fast distance computation. */ -private[clustering] class VectorWithNorm( +class VectorWithNorm( val vector: Vector, val norm: Double, val weight: Double = 1.0) extends Serializable { From 619dd6c59a3cb8e386b0c0f2183eb12282fe1272 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Fri, 22 May 2020 18:47:00 +0800 Subject: [PATCH 06/10] Integrating KMeansBlocksImpl to KMeans --- .../apache/spark/ml/clustering/KMeans.scala | 222 +++++++++- .../ml/clustering/KMeansBlocksImpl.scala | 399 ------------------ 2 files changed, 210 insertions(+), 411 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index cf75b13738d67..38fa50b9fb826 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -17,28 +17,27 @@ package org.apache.spark.ml.clustering -import scala.collection.mutable - import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Since -import org.apache.spark.ml.{Estimator, Model, PipelineStage} import org.apache.spark.ml.feature.{Instance, InstanceBlock} -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{BLAS, DenseMatrix, Matrices, Vector, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.ml.util._ +import org.apache.spark.ml.{Estimator, Model, PipelineStage} +import org.apache.spark.mllib.clustering.{DistanceMeasure, VectorWithNorm, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion +import scala.collection.mutable + /** * Common params for KMeans and KMeansModel */ @@ -428,13 +427,212 @@ class KMeans @Since("1.5.0") ( .persist(StorageLevel.MEMORY_AND_DISK) .setName(s"training dataset (blockSize=${$(blockSize)})") - val algo = new KMeansBlocksImpl($(k), $(maxIter), $(initMode), - $(initSteps), $(tol), $(seed), $(distanceMeasure)) + val sc = instances.sparkContext + + val initStartTime = System.nanoTime() + + val distanceMeasureInstance = DistanceMeasure.decodeFromString($(distanceMeasure)) + + val mllibKMeans = new MLlibKMeans() + + val centers = if (initMode == "random") { + mllibKMeans.initBlocksRandom(blocks) + } else { + mllibKMeans.initBlocksKMeansParallel(blocks, distanceMeasureInstance) + } + + val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 + logInfo(f"Initialization with $initMode took $initTimeInSeconds%.3f seconds.") + + var converged = false + var cost = 0.0 + var iteration = 0 + + val iterationStartTime = System.nanoTime() + + instr.logNumFeatures(centers.head.size) + + // Execute iterations of Lloyd's algorithm until converged + while (iteration < $(maxIter) && !converged) { + // Convert center vectors to dense matrix + val centers_matrix = Matrices.fromVectors(centers).toDense + + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers_matrix) + + val centers_num = centers_matrix.numRows + val centers_dim = centers_matrix.numCols + + // Compute squared sums for points + val data_square_sums: RDD[DenseMatrix] = blocks.mapPartitions { p => + p.map { block => + computePointsSquareSum(block.matrix.toDense, centers_num) } + } + + // Find the new centers + val collected = blocks.zip(data_square_sums).flatMap { + case (block, points_square_sums) => + val centers_matrix = bcCenters.value + val points_num = block.matrix.numRows + + val sums = Array.fill(centers_num)(Vectors.zeros(centers_dim)) + val counts = Array.fill(centers_num)(0L) + + // Compute squared sums for centers + val centers_square_sums = computeCentersSquareSum(centers_matrix, points_num) + + // Compute squared distances + val distances = computeSquaredDistances( + block.matrix.toDense, points_square_sums, + centers_matrix, centers_square_sums) + + val (bestCenters, weightedCosts) = findClosest( + distances, block.weights) + + for (cost <- weightedCosts) + costAccum.add(cost) + + // sums points around best center + for ((row, index) <- block.matrix.rowIter.zipWithIndex) { + val bestCenter = bestCenters(index) + if (block.weights.nonEmpty) { + BLAS.axpy(block.weights(index), row, sums(bestCenter)) + } else { + BLAS.axpy(1, row, sums(bestCenter)) + } + counts(bestCenter) += 1 + } + + counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => + BLAS.axpy(1.0, sum2, sum1) + (sum1, count1 + count2) + }.collectAsMap() + + if (iteration == 0) { + instr.logNumExamples(collected.values.map(_._2).sum) + } + + val newCenters = collected.mapValues { case (sum, count) => + distanceMeasureInstance.centroid(sum, count) + } + + bcCenters.destroy() + + // Update the cluster centers and costs + converged = true + newCenters.foreach { case (j, newCenter) => + if (converged && + !distanceMeasureInstance.isCenterConverged( + new VectorWithNorm(centers(j)), newCenter, ${tol})) { + converged = false + } + centers(j) = newCenter.vector + } + + cost = costAccum.value + iteration += 1 + } + + val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 + logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") + + if (iteration == ${maxIter}) { + logInfo(s"KMeans reached the max number of iterations: $maxIter.") + } else { + logInfo(s"KMeans converged in $iteration iterations.") + } + + logInfo(s"The cost is $cost.") + + val parentModel = new MLlibKMeansModel(centers.map(OldVectors.fromML(_)), + distanceMeasure, cost, iteration) - val model = algo.runWithWeight(blocks, Option(instr)) + val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) model } + private def computeSquaredDistances(points_matrix: DenseMatrix, + points_square_sums: DenseMatrix, + centers_matrix: DenseMatrix, + centers_square_sums: DenseMatrix): DenseMatrix = { + // (x - y)^2 = x^2 + y^2 - 2 * x * y + + // Add up squared sums of points and centers (x^2 + y^2) + val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) + + // use GEMM to compute squared distances, (2*x*y) can be decomposed to matrix multiply + val alpha = -2.0 + val beta = 1.0 + BLAS.gemm(alpha, points_matrix, centers_matrix.transpose, beta, ret) + + ret + } + + private def computePointsSquareSum(points_matrix: DenseMatrix, + centers_num: Int): DenseMatrix = { + val points_num = points_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- points_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until centers_num) + ret(index, i) = square + } + ret + } + + private def computeCentersSquareSum(centers_matrix: DenseMatrix, + points_num: Int): DenseMatrix = { + val centers_num = centers_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- centers_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until points_num) + ret(i, index) = square + } + ret + } + + // use GEMM to compute matrix sum + private def computeMatrixSum(matrix1: DenseMatrix, + matrix2: DenseMatrix): DenseMatrix = { + val column_num = matrix1.numCols + val eye = DenseMatrix.eye(column_num) + val alpha = 1.0 + val beta = 1.0 + BLAS.gemm(alpha, matrix1, eye, beta, matrix2) + matrix2 + } + + private def findClosest(distances: DenseMatrix, + weights: Array[Double]): (Array[Int], Array[Double]) = { + val points_num = distances.numRows + val ret_closest = new Array[Int](points_num) + val ret_cost = new Array[Double](points_num) + + for ((row, index) <- distances.rowIter.zipWithIndex) { + var closest = 0 + var cost = row(0) + for (i <- 1 until row.size) { + if (row(i) < cost) { + closest = i + cost = row(i) + } + } + ret_closest(index) = closest + + // use weighted squared distance as cost + if (weights.nonEmpty) { + ret_cost(index) = cost * weights(index) + } else { + ret_cost(index) = cost + } + } + + (ret_closest, ret_cost) + + } + @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala deleted file mode 100644 index 96bc56b3a2045..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeansBlocksImpl.scala +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.clustering - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.InstanceBlock -import org.apache.spark.ml.linalg.{DenseMatrix, Vector, Vectors} -import org.apache.spark.ml.linalg.BLAS -import org.apache.spark.ml.linalg.BLAS.axpy -import org.apache.spark.ml.util.Instrumentation -import org.apache.spark.mllib.clustering.DistanceMeasure -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils -import org.apache.spark.util.random.XORShiftRandom - -/** - * K-means clustering using RDD[InstanceBlock] as input format. - */ -class KMeansBlocksImpl ( - private var k: Int, - private var maxIterations: Int, - private var initializationMode: String, - private var initializationSteps: Int, - private var epsilon: Double, - private var seed: Long, - private var distanceMeasure: String) extends Serializable with Logging { - - /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, - * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random, - * distanceMeasure: "euclidean"}. - */ - def this() = this(2, 20, KMeansBlocksImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), - DistanceMeasure.EUCLIDEAN) - - // Initial cluster centers can be provided as a KMeansModel object rather than using the - // random or k-means|| initializationMode - private var initialModel: Option[KMeansModel] = None - - private def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { - - val v: Array[Vector] = vectors.map(_.vector) - - VectorsToDenseMatrix(v.toIterator) - } - - private def VectorsToDenseMatrix(vectors: Iterator[Vector]): DenseMatrix = { - val vector_array = vectors.toArray - val column_num = vector_array(0).size - val row_num = vector_array.length - - val values = new Array[Double](row_num * column_num) - var rowIndex = 0 - - // convert to column-major dense matrix - for (vector <- vector_array) { - for ((value, index) <- vector.toArray.zipWithIndex) { - values(index * row_num + rowIndex) = value - } - rowIndex = rowIndex + 1 - } - - new DenseMatrix(row_num, column_num, values) - } - - private def computeSquaredDistances(points_matrix: DenseMatrix, - points_square_sums: DenseMatrix, - centers_matrix: DenseMatrix, - centers_square_sums: DenseMatrix): DenseMatrix = { - // (x - y)^2 = x^2 + y^2 - 2 * x * y - - // Add up squared sums of points and centers (x^2 + y^2) - val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) - - // use GEMM to compute squared distances, (2*x*y) can be decomposed to matrix multiply - val alpha = -2.0 - val beta = 1.0 - BLAS.gemm(alpha, points_matrix, centers_matrix.transpose, beta, ret) - - ret - } - - private def computePointsSquareSum(points_matrix: DenseMatrix, - centers_num: Int): DenseMatrix = { - val points_num = points_matrix.numRows - val ret = DenseMatrix.zeros(points_num, centers_num) - for ((row, index) <- points_matrix.rowIter.zipWithIndex) { - val square = BLAS.dot(row, row) - for (i <- 0 until centers_num) - ret(index, i) = square - } - ret - } - - private def computeCentersSquareSum(centers_matrix: DenseMatrix, - points_num: Int): DenseMatrix = { - val centers_num = centers_matrix.numRows - val ret = DenseMatrix.zeros(points_num, centers_num) - for ((row, index) <- centers_matrix.rowIter.zipWithIndex) { - val square = BLAS.dot(row, row) - for (i <- 0 until points_num) - ret(i, index) = square - } - ret - } - - // use GEMM to compute matrix sum - private def computeMatrixSum(matrix1: DenseMatrix, - matrix2: DenseMatrix): DenseMatrix = { - val column_num = matrix1.numCols - val eye = DenseMatrix.eye(column_num) - val alpha = 1.0 - val beta = 1.0 - BLAS.gemm(alpha, matrix1, eye, beta, matrix2) - matrix2 - } - - private def findClosest(distances: DenseMatrix, - weights: Array[Double]): (Array[Int], Array[Double]) = { - val points_num = distances.numRows - val ret_closest = new Array[Int](points_num) - val ret_cost = new Array[Double](points_num) - - for ((row, index) <- distances.rowIter.zipWithIndex) { - var closest = 0 - var cost = row(0) - for (i <- 1 until row.size) { - if (row(i) < cost) { - closest = i - cost = row(i) - } - } - ret_closest(index) = closest - - // use weighted squared distance as cost - if (weights.nonEmpty) { - ret_cost(index) = cost * weights(index) - } else { - ret_cost(index) = cost - } - } - - (ret_closest, ret_cost) - - } - - def runWithWeight(data: RDD[InstanceBlock], - instr: Option[Instrumentation]): KMeansModel = { - - val sc = data.sparkContext - - val initStartTime = System.nanoTime() - - val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure) - - val centers = if (initializationMode == KMeansBlocksImpl.RANDOM) { - initRandom(data) - } else { - initKMeansParallel(data, distanceMeasureInstance) - } - - val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 - logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") - - var converged = false - var cost = 0.0 - var iteration = 0 - - val iterationStartTime = System.nanoTime() - - instr.foreach(_.logNumFeatures(centers.head.vector.size)) - - // Execute iterations of Lloyd's algorithm until converged - while (iteration < maxIterations && !converged) { - // Convert center vectors to dense matrix - val centers_matrix = VectorsToDenseMatrix(centers) - - val costAccum = sc.doubleAccumulator - val bcCenters = sc.broadcast(centers_matrix) - - val centers_num = centers_matrix.numRows - val centers_dim = centers_matrix.numCols - - // Compute squared sums for points - val data_square_sums: RDD[DenseMatrix] = data.mapPartitions { blocks => - blocks.map { block => - computePointsSquareSum(DenseMatrix.fromML(block.matrix.toDense), centers_num) } - } - - // Find the new centers - val collected = data.zip(data_square_sums).flatMap { - case (blocks, points_square_sums) => - val centers_matrix = bcCenters.value - val points_num = blocks.matrix.numRows - - val sums = Array.fill(centers_num)(Vectors.zeros(centers_dim)) - val counts = Array.fill(centers_num)(0L) - - // Compute squared sums for centers - val centers_square_sums = computeCentersSquareSum(centers_matrix, points_num) - - // Compute squared distances - val distances = computeSquaredDistances( - DenseMatrix.fromML(blocks.matrix.toDense), points_square_sums, - centers_matrix, centers_square_sums) - - val (bestCenters, weightedCosts) = findClosest(distances, blocks.weights) - - for (cost <- weightedCosts) - costAccum.add(cost) - - // sums points around best center - for ((row, index) <- blocks.matrix.rowIter.zipWithIndex) { - val bestCenter = bestCenters(index) - if (blocks.weights.nonEmpty) { - axpy(blocks.weights(index), Vectors.fromML(row), sums(bestCenter)) - } else { - axpy(1, Vectors.fromML(row), sums(bestCenter)) - } - counts(bestCenter) += 1 - } - - counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator - }.reduceByKey { case ((sum1, count1), (sum2, count2)) => - axpy(1.0, sum2, sum1) - (sum1, count1 + count2) - }.collectAsMap() - - if (iteration == 0) { - instr.foreach(_.logNumExamples(collected.values.map(_._2).sum)) - } - - val newCenters = collected.mapValues { case (sum, count) => - distanceMeasureInstance.centroid(sum, count) - } - - bcCenters.destroy() - - // Update the cluster centers and costs - converged = true - newCenters.foreach { case (j, newCenter) => - if (converged && - !distanceMeasureInstance.isCenterConverged(centers(j), newCenter, epsilon)) { - converged = false - } - centers(j) = newCenter - } - - cost = costAccum.value - iteration += 1 - } - - val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 - logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") - - if (iteration == maxIterations) { - logInfo(s"KMeans reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"KMeans converged in $iteration iterations.") - } - - logInfo(s"The cost is $cost.") - - new KMeansModel(centers.map { c => c.vector }, - distanceMeasure, cost, iteration) - } - - /** - * Initialize a set of cluster centers at random. - */ - private def initRandom(data: RDD[InstanceBlock]): Array[VectorWithNorm] = { - - val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.matrix.rowIter) - .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) - - - // Select without replacement; may still produce duplicates if the data has < k distinct - // points, so deduplicate the centroids to match the behavior of k-means|| in the same situation - - vectorWithNorms.takeSample(false, k, new XORShiftRandom(this.seed).nextInt()) - .map(_.vector).distinct.map(new VectorWithNorm(_)) - } - - /** - * Initialize a set of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ - private[clustering] def initKMeansParallel(rdd_matrix: RDD[InstanceBlock], - distanceMeasureInstance: DistanceMeasure): Array[VectorWithNorm] = { - - val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.matrix.rowIter) - .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) - - // Initialize empty centers and point costs. - var costs = data.map(_ => Double.PositiveInfinity) - - // Initialize the first center to a random point. - val seed = new XORShiftRandom(this.seed).nextInt() - val sample = data.takeSample(false, 1, seed) - // Could be empty if data is empty; fail with a better message early: - require(sample.nonEmpty, s"No samples available from $data") - - val centers = ArrayBuffer[VectorWithNorm]() - var newCenters = Array(sample.head.toDense) - centers ++= newCenters - - // On each step, sample 2 * k points on average with probability proportional - // to their squared distance from the centers. Note that only distances between points - // and new centers are computed in each iteration. - var step = 0 - val bcNewCentersList = ArrayBuffer[Broadcast[_]]() - while (step < initializationSteps) { - val bcNewCenters = data.context.broadcast(newCenters) - bcNewCentersList += bcNewCenters - val preCosts = costs - costs = data.zip(preCosts).map { case (point, cost) => - math.min(distanceMeasureInstance.pointCost(bcNewCenters.value, point), cost) - }.persist(StorageLevel.MEMORY_AND_DISK) - val sumCosts = costs.sum() - - bcNewCenters.unpersist() - preCosts.unpersist() - - val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointCosts) => - val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - pointCosts.filter { case (_, c) => rand.nextDouble() < 2.0 * c * k / sumCosts }.map(_._1) - }.collect() - newCenters = chosen.map(_.toDense) - centers ++= newCenters - step += 1 - } - - costs.unpersist() - bcNewCentersList.foreach(_.destroy()) - - val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_)).toArray - - if (distinctCenters.length <= k) { - distinctCenters - } else { - // Finally, we might have a set of more than k distinct candidate centers; weight each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick k of them - val bcCenters = data.context.broadcast(distinctCenters) - val countMap = data - .map(distanceMeasureInstance.findClosest(bcCenters.value, _)._1) - .countByValue() - - bcCenters.destroy() - - val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray - LocalKMeans.kMeansPlusPlus(0, distinctCenters, myWeights, k, 30) - } - } -} - - -/** - * Top-level methods for calling K-means clustering. - */ -object KMeansBlocksImpl { - - // Initialization mode names - val RANDOM = "random" - val K_MEANS_PARALLEL = "k-means||" - - private[spark] def validateInitMode(initMode: String): Boolean = { - initMode match { - case KMeansBlocksImpl.RANDOM => true - case KMeansBlocksImpl.K_MEANS_PARALLEL => true - case _ => false - } - } -} - From ed92da366cfe41b81fc4710f5c2929f1780d178c Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Fri, 22 May 2020 18:54:55 +0800 Subject: [PATCH 07/10] imports style --- .../org/apache/spark/ml/clustering/KMeans.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 38fa50b9fb826..b1a605dc20e7a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -17,27 +17,28 @@ package org.apache.spark.ml.clustering +import scala.collection.mutable + import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model, PipelineStage} import org.apache.spark.ml.feature.{Instance, InstanceBlock} import org.apache.spark.ml.linalg.{BLAS, DenseMatrix, Matrices, Vector, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ -import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.ml.util._ -import org.apache.spark.ml.{Estimator, Model, PipelineStage} -import org.apache.spark.mllib.clustering.{DistanceMeasure, VectorWithNorm, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} -import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.ml.util.Instrumentation.instrumented +import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel, VectorWithNorm} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion -import scala.collection.mutable - /** * Common params for KMeans and KMeansModel */ From 3de91f86a0f8ea0b0483e22be466cd7ade2a9247 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Fri, 22 May 2020 19:54:24 +0800 Subject: [PATCH 08/10] nit --- .../src/main/scala/org/apache/spark/ml/clustering/KMeans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index b1a605dc20e7a..2667859a8ee7c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -547,7 +547,7 @@ class KMeans @Since("1.5.0") ( logInfo(s"The cost is $cost.") val parentModel = new MLlibKMeansModel(centers.map(OldVectors.fromML(_)), - distanceMeasure, cost, iteration) + ${distanceMeasure}, cost, iteration) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) model From 7671010828834ed87a1537fe2e9c4cb3e24e6d69 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Sat, 23 May 2020 14:35:19 +0800 Subject: [PATCH 09/10] Fix trainOnBlocks MLlibKMeans init, should pass params --- .../scala/org/apache/spark/ml/clustering/KMeans.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 2667859a8ee7c..f38274c97249e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -434,8 +434,15 @@ class KMeans @Since("1.5.0") ( val distanceMeasureInstance = DistanceMeasure.decodeFromString($(distanceMeasure)) + // Use MLlibKMeans to initialize centers val mllibKMeans = new MLlibKMeans() - + .setK($(k)) + .setInitializationMode($(initMode)) + .setInitializationSteps($(initSteps)) + .setMaxIterations($(maxIter)) + .setSeed($(seed)) + .setEpsilon($(tol)) + .setDistanceMeasure($(distanceMeasure)) val centers = if (initMode == "random") { mllibKMeans.initBlocksRandom(blocks) } else { From 9ce0e56242d68f8e311bc73447652fddc9eb0726 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Sat, 23 May 2020 16:15:24 +0800 Subject: [PATCH 10/10] sums points around best center by adding values directly to avoid copying array --- .../apache/spark/ml/clustering/KMeans.scala | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index f38274c97249e..11469cceee992 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -501,16 +501,34 @@ class KMeans @Since("1.5.0") ( costAccum.add(cost) // sums points around best center - for ((row, index) <- block.matrix.rowIter.zipWithIndex) { - val bestCenter = bestCenters(index) - if (block.weights.nonEmpty) { - BLAS.axpy(block.weights(index), row, sums(bestCenter)) - } else { - BLAS.axpy(1, row, sums(bestCenter)) +// for ((row, index) <- block.matrix.rowIter.zipWithIndex) { +// val bestCenter = bestCenters(index) +// if (block.weights.nonEmpty) { +// BLAS.axpy(block.weights(index), row, sums(bestCenter)) +// } else { +// BLAS.axpy(1, row, sums(bestCenter)) +// } +// counts(bestCenter) += 1 +// } + + // sums points around best center, adding values directly without copying array + if (block.weights.nonEmpty) { + for (rowIndex <- 0 until block.matrix.numRows) { + val bestCenter = bestCenters(rowIndex) + for (i <- 0 until centers_dim) + sums(bestCenter).toArray(i) += block.weights(rowIndex) + block.matrix(rowIndex, i) + counts(bestCenter) += 1 + } + } else { + for (rowIndex <- 0 until block.matrix.numRows) { + val bestCenter = bestCenters(rowIndex) + for (i <- 0 until centers_dim) + sums(bestCenter).toArray(i) += block.matrix(rowIndex, i) + counts(bestCenter) += 1 } - counts(bestCenter) += 1 } + counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator }.reduceByKey { case ((sum1, count1), (sum2, count2)) => BLAS.axpy(1.0, sum2, sum1)