diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index a42c920e24987..11469cceee992 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -23,12 +23,13 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model, PipelineStage} -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.feature.{Instance, InstanceBlock} +import org.apache.spark.ml.linalg.{BLAS, DenseMatrix, Matrices, Vector, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel} +import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel, VectorWithNorm} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD @@ -42,7 +43,8 @@ import org.apache.spark.util.VersionUtils.majorVersion * Common params for KMeans and KMeansModel */ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol - with HasSeed with HasPredictionCol with HasTol with HasDistanceMeasure with HasWeightCol { + with HasSeed with HasPredictionCol with HasTol with HasDistanceMeasure with HasWeightCol + with HasBlockSize { /** * The number of clusters to create (k). Must be > 1. Note that it is possible for fewer than @@ -276,7 +278,9 @@ class KMeans @Since("1.5.0") ( initMode -> MLlibKMeans.K_MEANS_PARALLEL, initSteps -> 2, tol -> 1e-4, - distanceMeasure -> DistanceMeasure.EUCLIDEAN) + distanceMeasure -> DistanceMeasure.EUCLIDEAN, + blockSize -> 1 + ) @Since("1.5.0") override def copy(extra: ParamMap): KMeans = defaultCopy(extra) @@ -330,6 +334,25 @@ class KMeans @Since("1.5.0") ( @Since("3.0.0") def setWeightCol(value: String): this.type = set(weightCol, value) + /** + * Set block size for stacking input data in matrices. + * If blockSize == 1, then stacking will be skipped, and each vector is treated individually; + * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines + * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV). + * Recommended size is between 10 and 1000. An appropriate choice of the block size depends + * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example, + * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads). + * Note that existing BLAS implementations are mainly optimized for dense matrices, if the + * input dataset is sparse, stacking may bring no performance gain, the worse is possible + * performance regression. + * Default is 1. + * + * @group expertSetParam + */ + @Since("3.1.0") + def setBlockSize(value: Int): this.type = set(blockSize, value) + setDefault(blockSize -> 1) + @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr => transformSchema(dataset.schema, logging = true) @@ -341,9 +364,9 @@ class KMeans @Since("1.5.0") ( lit(1.0) } - val instances: RDD[(OldVector, Double)] = dataset + val instances: RDD[(Vector, Double)] = dataset .select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map { - case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + case Row(point: Vector, weight: Double) => (point, weight) } if (handlePersistence) { @@ -353,33 +376,289 @@ class KMeans @Since("1.5.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure, - maxIter, seed, tol, weightCol) - val algo = new MLlibKMeans() - .setK($(k)) - .setInitializationMode($(initMode)) - .setInitializationSteps($(initSteps)) - .setMaxIterations($(maxIter)) - .setSeed($(seed)) - .setEpsilon($(tol)) - .setDistanceMeasure($(distanceMeasure)) - val parentModel = algo.runWithWeight(instances, Option(instr)) - val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + maxIter, seed, tol, weightCol, blockSize) + + val model = if ($(blockSize) == 1) { + trainOnRows(instances) + } else { + trainOnBlocks(instances) + } + val summary = new KMeansSummary( model.transform(dataset), $(predictionCol), $(featuresCol), $(k), - parentModel.numIter, - parentModel.trainingCost) - + model.parentModel.numIter, + model.parentModel.trainingCost) model.setSummary(Some(summary)) - instr.logNamedValue("clusterSizes", summary.clusterSizes) + + instr.logNamedValue("clusterSizes", model.summary.clusterSizes) if (handlePersistence) { instances.unpersist() } model } + private def trainOnRows(instances: RDD[(Vector, Double)]): KMeansModel = + instrumented { instr => + val oldVectorInstances = instances.map { + case (point: Vector, weight: Double) => (OldVectors.fromML(point), weight) + } + val algo = new MLlibKMeans() + .setK($(k)) + .setInitializationMode($(initMode)) + .setInitializationSteps($(initSteps)) + .setMaxIterations($(maxIter)) + .setSeed($(seed)) + .setEpsilon($(tol)) + .setDistanceMeasure($(distanceMeasure)) + val parentModel = algo.runWithWeight(oldVectorInstances, Option(instr)) + val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + model + } + + private def trainOnBlocks(instances: RDD[(Vector, Double)]): KMeansModel = + instrumented { instr => + val instanceRDD: RDD[Instance] = instances.map { + case (point: Vector, weight: Double) => Instance(0.0, weight, point) + } + + val blocks = InstanceBlock.blokify(instanceRDD, $(blockSize)) + .persist(StorageLevel.MEMORY_AND_DISK) + .setName(s"training dataset (blockSize=${$(blockSize)})") + + val sc = instances.sparkContext + + val initStartTime = System.nanoTime() + + val distanceMeasureInstance = DistanceMeasure.decodeFromString($(distanceMeasure)) + + // Use MLlibKMeans to initialize centers + val mllibKMeans = new MLlibKMeans() + .setK($(k)) + .setInitializationMode($(initMode)) + .setInitializationSteps($(initSteps)) + .setMaxIterations($(maxIter)) + .setSeed($(seed)) + .setEpsilon($(tol)) + .setDistanceMeasure($(distanceMeasure)) + val centers = if (initMode == "random") { + mllibKMeans.initBlocksRandom(blocks) + } else { + mllibKMeans.initBlocksKMeansParallel(blocks, distanceMeasureInstance) + } + + val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 + logInfo(f"Initialization with $initMode took $initTimeInSeconds%.3f seconds.") + + var converged = false + var cost = 0.0 + var iteration = 0 + + val iterationStartTime = System.nanoTime() + + instr.logNumFeatures(centers.head.size) + + // Execute iterations of Lloyd's algorithm until converged + while (iteration < $(maxIter) && !converged) { + // Convert center vectors to dense matrix + val centers_matrix = Matrices.fromVectors(centers).toDense + + val costAccum = sc.doubleAccumulator + val bcCenters = sc.broadcast(centers_matrix) + + val centers_num = centers_matrix.numRows + val centers_dim = centers_matrix.numCols + + // Compute squared sums for points + val data_square_sums: RDD[DenseMatrix] = blocks.mapPartitions { p => + p.map { block => + computePointsSquareSum(block.matrix.toDense, centers_num) } + } + + // Find the new centers + val collected = blocks.zip(data_square_sums).flatMap { + case (block, points_square_sums) => + val centers_matrix = bcCenters.value + val points_num = block.matrix.numRows + + val sums = Array.fill(centers_num)(Vectors.zeros(centers_dim)) + val counts = Array.fill(centers_num)(0L) + + // Compute squared sums for centers + val centers_square_sums = computeCentersSquareSum(centers_matrix, points_num) + + // Compute squared distances + val distances = computeSquaredDistances( + block.matrix.toDense, points_square_sums, + centers_matrix, centers_square_sums) + + val (bestCenters, weightedCosts) = findClosest( + distances, block.weights) + + for (cost <- weightedCosts) + costAccum.add(cost) + + // sums points around best center +// for ((row, index) <- block.matrix.rowIter.zipWithIndex) { +// val bestCenter = bestCenters(index) +// if (block.weights.nonEmpty) { +// BLAS.axpy(block.weights(index), row, sums(bestCenter)) +// } else { +// BLAS.axpy(1, row, sums(bestCenter)) +// } +// counts(bestCenter) += 1 +// } + + // sums points around best center, adding values directly without copying array + if (block.weights.nonEmpty) { + for (rowIndex <- 0 until block.matrix.numRows) { + val bestCenter = bestCenters(rowIndex) + for (i <- 0 until centers_dim) + sums(bestCenter).toArray(i) += block.weights(rowIndex) + block.matrix(rowIndex, i) + counts(bestCenter) += 1 + } + } else { + for (rowIndex <- 0 until block.matrix.numRows) { + val bestCenter = bestCenters(rowIndex) + for (i <- 0 until centers_dim) + sums(bestCenter).toArray(i) += block.matrix(rowIndex, i) + counts(bestCenter) += 1 + } + } + + + counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator + }.reduceByKey { case ((sum1, count1), (sum2, count2)) => + BLAS.axpy(1.0, sum2, sum1) + (sum1, count1 + count2) + }.collectAsMap() + + if (iteration == 0) { + instr.logNumExamples(collected.values.map(_._2).sum) + } + + val newCenters = collected.mapValues { case (sum, count) => + distanceMeasureInstance.centroid(sum, count) + } + + bcCenters.destroy() + + // Update the cluster centers and costs + converged = true + newCenters.foreach { case (j, newCenter) => + if (converged && + !distanceMeasureInstance.isCenterConverged( + new VectorWithNorm(centers(j)), newCenter, ${tol})) { + converged = false + } + centers(j) = newCenter.vector + } + + cost = costAccum.value + iteration += 1 + } + + val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 + logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") + + if (iteration == ${maxIter}) { + logInfo(s"KMeans reached the max number of iterations: $maxIter.") + } else { + logInfo(s"KMeans converged in $iteration iterations.") + } + + logInfo(s"The cost is $cost.") + + val parentModel = new MLlibKMeansModel(centers.map(OldVectors.fromML(_)), + ${distanceMeasure}, cost, iteration) + + val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + model + } + + private def computeSquaredDistances(points_matrix: DenseMatrix, + points_square_sums: DenseMatrix, + centers_matrix: DenseMatrix, + centers_square_sums: DenseMatrix): DenseMatrix = { + // (x - y)^2 = x^2 + y^2 - 2 * x * y + + // Add up squared sums of points and centers (x^2 + y^2) + val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) + + // use GEMM to compute squared distances, (2*x*y) can be decomposed to matrix multiply + val alpha = -2.0 + val beta = 1.0 + BLAS.gemm(alpha, points_matrix, centers_matrix.transpose, beta, ret) + + ret + } + + private def computePointsSquareSum(points_matrix: DenseMatrix, + centers_num: Int): DenseMatrix = { + val points_num = points_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- points_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until centers_num) + ret(index, i) = square + } + ret + } + + private def computeCentersSquareSum(centers_matrix: DenseMatrix, + points_num: Int): DenseMatrix = { + val centers_num = centers_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- centers_matrix.rowIter.zipWithIndex) { + val square = BLAS.dot(row, row) + for (i <- 0 until points_num) + ret(i, index) = square + } + ret + } + + // use GEMM to compute matrix sum + private def computeMatrixSum(matrix1: DenseMatrix, + matrix2: DenseMatrix): DenseMatrix = { + val column_num = matrix1.numCols + val eye = DenseMatrix.eye(column_num) + val alpha = 1.0 + val beta = 1.0 + BLAS.gemm(alpha, matrix1, eye, beta, matrix2) + matrix2 + } + + private def findClosest(distances: DenseMatrix, + weights: Array[Double]): (Array[Int], Array[Double]) = { + val points_num = distances.numRows + val ret_closest = new Array[Int](points_num) + val ret_cost = new Array[Double](points_num) + + for ((row, index) <- distances.rowIter.zipWithIndex) { + var closest = 0 + var cost = row(0) + for (i <- 1 until row.size) { + if (row(i) < cost) { + closest = i + cost = row(i) + } + } + ret_closest(index) = closest + + // use weighted squared distance as cost + if (weights.nonEmpty) { + ret_cost(index) = cost * weights(index) + } else { + ret_cost(index) = cost + } + } + + (ret_closest, ret_cost) + + } + @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 1c5de5a092d6e..14adb3d5796ad 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.Since import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.InstanceBlock +import org.apache.spark.ml.linalg.{Vector => MLVector} import org.apache.spark.ml.util.Instrumentation import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.axpy @@ -349,6 +351,22 @@ class KMeans private ( new KMeansModel(centers.map(_.vector), distanceMeasure, cost, iteration) } + def initBlocksRandom(data: RDD[InstanceBlock]): Array[MLVector] = { + val vectorWithNorms: RDD[VectorWithNorm] = data.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) + + initRandom(vectorWithNorms).map(_.vector.asML) + } + + def initBlocksKMeansParallel(rdd_matrix: RDD[InstanceBlock], + distanceMeasureInstance: DistanceMeasure): Array[MLVector] = { + + val data: RDD[VectorWithNorm] = rdd_matrix.flatMap(_.matrix.rowIter) + .map(Vectors.fromML(_)).map(new VectorWithNorm(_)) + + initKMeansParallel(data, distanceMeasureInstance).map(_.vector.asML) + } + /** * Initialize a set of cluster centers at random. */ @@ -517,7 +535,7 @@ object KMeans { /** * A vector with its norm for fast distance computation. */ -private[clustering] class VectorWithNorm( +class VectorWithNorm( val vector: Vector, val norm: Double, val weight: Double = 1.0) extends Serializable { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 584594436267f..c34e51b658e2c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -58,6 +58,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes assert(kmeans.getInitSteps === 2) assert(kmeans.getTol === 1e-4) assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN) + assert(kmeans.getBlockSize == 1) val model = kmeans.setMaxIter(1).fit(dataset) val transformed = model.transform(dataset) @@ -69,6 +70,19 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes assert(copiedModel.hasSummary) } + test("default parameters on blocks") { + val kmeans = new KMeans() + val model = kmeans.fit(dataset) + Seq(4, 16, 64).foreach { blockSize => + val model2 = kmeans.setBlockSize(blockSize).fit(dataset) + + model.clusterCenters.zip(model2.clusterCenters).map { + case (v1, v2) => assert(v1 ~== v2 relTol 1e-9) + } + assert(model.summary.trainingCost ~== model2.summary.trainingCost relTol 1e-9) + } + } + test("set parameters") { val kmeans = new KMeans() .setK(9)