From fbfdcd8946d8681c062ba08d71eac961db50417d Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Thu, 2 Oct 2014 13:01:18 -0700 Subject: [PATCH 1/7] This commit introduces a general distance function trait, PointOps, for the Spark K-Means clusterer. There are no public API changes*. Distance Function Trait The PointOps trait defines the distance function. The PointOps trait is more than a simple distance function. It also defines the types of Points and Centers for the clusterer. Standard MLLIB Vectors are converted into Points and Centers. In the case of the FastEuclideanOps implementation of PointOps, the Point and Center types includes vector norm members. In other distance functions such as the Kullback-Leibler distance function, the Point and Center types include different values that speed up the distance calculation in a similar way that caching vector norms speeds up the Euclidean distance function. This addresses SPARK-3219. Refactoring To understand this original code, I found it useful to refactor the original implementation into components. You may find it helpful to understand this pull request by looking at the new components and comparing them to their original implementation. Unfortunately, GitHub diff does not help very much with this. This commit splits up the clusterer into a number of components which behave (largely) like their predecessors. KMeansParallel implements the K-Means || initialization algorithm. KMeansRandom implements the K-Means Random initialization algorithm. MultiKMeans implements the K-Means algorithm on multiple sets of cluster centers using a given distance function. Traits for the initializer, KMeansInitializer, and the general K-Means clusterer, MultiKMeansClusterer, are provided to highlight the salient interfaces with the intent that alternative implementations of these interfaces may be provided in the future. Performance This pull request is not focused on performance. Nevertheless, the performance of the KMeans++ implementation was dramatically improved by NOT recomputing distances to clusters centers that were present in previous steps. This turns a quadratic implementation into a linear one. Second, the KMeans++ implementation uses the general K-Means clusterer in the final step. This parallelizes a step that was sequential. Together, these changes address SPARK-3424. Next Steps This pull request does not introduce new user-visible changes. The next step is to make different distance functions available through a user-visible API. I will provide other distance functions after this pull request has been accepted. Then, we can decide on an appropriate user-level API to access those functions. Compatibility While there are no user-level API changes, the behavior of the clusterer is different on some tests. Specifically, the handling of empty clusters has changed. Empty clusters are not filled with random points in this implementation. The former behavior is undesirable for a number a reasons, not the least of which is that there is no reasonable use for duplicate cluster centers. To accommodate the change in behavior, the test cases were changed accordingly. This addresses SPARK-3261. The private K-Means constructor which was used by some test Java code and one example was replaced with a Scala constructor that is not Java friendly. Since the constructor was not user visible, I simply changed the Java test code and the example to use the higher level interface. Testing This code has been tested (albeit while packaged outside of Spark) and performance measured on data sets of millions of features each with hundreds of dimensions and on tens of thousands of clusters. --- .../spark/examples/mllib/DenseKMeans.scala | 6 +- .../mllib/api/python/PythonMLLibAPI.scala | 9 +- .../clustering/GeneralizedKMeansModel.scala | 59 +++ .../spark/mllib/clustering/KMeans.scala | 436 ++---------------- .../mllib/clustering/KMeansInitializer.scala | 25 + .../spark/mllib/clustering/KMeansModel.scala | 33 +- .../mllib/clustering/KMeansParallel.scala | 153 ++++++ .../mllib/clustering/KMeansPlusPlus.scala | 200 ++++++++ .../spark/mllib/clustering/KMeansRandom.scala | 64 +++ .../spark/mllib/clustering/LocalKMeans.scala | 127 ----- .../spark/mllib/clustering/MultiKMeans.scala | 134 ++++++ .../clustering/MultiKMeansClusterer.scala | 28 ++ .../clustering/metrics/EuclideanOps.scala | 67 +++ .../clustering/metrics/FastEuclideanOps.scala | 80 ++++ .../spark/mllib/clustering/package.scala | 142 ++++++ .../mllib/clustering/JavaKMeansSuite.java | 11 +- .../spark/mllib/clustering/KMeansSuite.scala | 20 +- 17 files changed, 1011 insertions(+), 583 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index 89dfa26c2299c..9dc979dbd979a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -94,11 +94,7 @@ object DenseKMeans { case Parallel => KMeans.K_MEANS_PARALLEL } - val model = new KMeans() - .setInitializationMode(initMode) - .setK(params.k) - .setMaxIterations(params.numIterations) - .run(examples) + val model = KMeans.train(examples, params.k, params.numIterations, 1, initMode) val cost = model.computeCost(examples) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e9f41758581e3..1026a0352fbdc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -246,14 +246,7 @@ class PythonMLLibAPI extends Serializable { maxIterations: Int, runs: Int, initializationMode: String): KMeansModel = { - val kMeansAlg = new KMeans() - .setK(k) - .setMaxIterations(maxIterations) - .setRuns(runs) - .setInitializationMode(initializationMode) - // Disable the uncached input warning because 'data' is a deliberately uncached MappedRDD. - .disableUncachedWarning() - return kMeansAlg.run(data.rdd) + return KMeans.train(data,k, maxIterations,runs,initializationMode) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala new file mode 100644 index 0000000000000..988ebda7aca31 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.SparkContext._ +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.base.{PointOps, FP} +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.rdd.RDD + +/** + * A clustering model for K-means. Each point belongs to the cluster with the closest center. + */ +private[mllib] class GeneralizedKMeansModel[P<:FP, C<:FP]( + val pointOps: PointOps[P, C], + val centers: Array[C]) + extends Serializable { + + val k: Int = clusterCenters.length + + def clusterCenters : Array[Vector] = centers.map{ c => pointOps.centerToVector(c) } + + /** Returns the cluster index that a given point belongs to. */ + def predict(point: Vector): Int = + pointOps.findClosest(centers, pointOps.vectorToPoint(point))._1 + + + /** Maps given points to their cluster indices. */ + def predict(points: RDD[Vector]): RDD[Int] = + points.map(p => pointOps.findClosest(centers, pointOps.vectorToPoint(p))._1) + + + /** Maps given points to their cluster indices. */ + def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = + predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] + + /** + * Return the K-means cost (sum of squared distances of points to their nearest center) for this + * model on the given data. + */ + def computeCost(data: RDD[Vector]): Double = + data.map(p => pointOps.findClosest(centers, pointOps.vectorToPoint(p))._2).sum() + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 7443f232ec3e7..762e2bd733cef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -17,429 +17,57 @@ package org.apache.spark.mllib.clustering -import scala.collection.mutable.ArrayBuffer -import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} - -import org.apache.spark.annotation.Experimental -import org.apache.spark.Logging -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.{Vector, Vectors} -import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.base.{FP, PointOps} +import org.apache.spark.mllib.clustering.metrics.FastEuclideanOps import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.random.XORShiftRandom - -/** - * K-means clustering with support for multiple parallel runs and a k-means++ like initialization - * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested, - * they are executed together with joint passes over the data for efficiency. - * - * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given - * to it should be cached by the user. - */ -class KMeans private ( - private var k: Int, - private var maxIterations: Int, - private var runs: Int, - private var initializationMode: String, - private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with Logging { - - /** - * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}. - */ - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) - - /** Set the number of clusters to create (k). Default: 2. */ - def setK(k: Int): this.type = { - this.k = k - this - } - - /** Set maximum number of iterations to run. Default: 20. */ - def setMaxIterations(maxIterations: Int): this.type = { - this.maxIterations = maxIterations - this - } - - /** - * Set the initialization algorithm. This can be either "random" to choose random points as - * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ - * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. - */ - def setInitializationMode(initializationMode: String): this.type = { - if (initializationMode != KMeans.RANDOM && initializationMode != KMeans.K_MEANS_PARALLEL) { - throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode) - } - this.initializationMode = initializationMode - this - } - - /** - * :: Experimental :: - * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm - * this many times with random starting conditions (configured by the initialization mode), then - * return the best clustering found over any run. Default: 1. - */ - @Experimental - def setRuns(runs: Int): this.type = { - if (runs <= 0) { - throw new IllegalArgumentException("Number of runs must be positive") - } - this.runs = runs - this - } - - /** - * Set the number of steps for the k-means|| initialization mode. This is an advanced - * setting -- the default of 5 is almost always enough. Default: 5. - */ - def setInitializationSteps(initializationSteps: Int): this.type = { - if (initializationSteps <= 0) { - throw new IllegalArgumentException("Number of initialization steps must be positive") - } - this.initializationSteps = initializationSteps - this - } - - /** - * Set the distance threshold within which we've consider centers to have converged. - * If all centers move less than this Euclidean distance, we stop iterating one run. - */ - def setEpsilon(epsilon: Double): this.type = { - this.epsilon = epsilon - this - } - - /** Whether a warning should be logged if the input RDD is uncached. */ - private var warnOnUncachedInput = true - - /** Disable warnings about uncached input. */ - private[spark] def disableUncachedWarning(): this.type = { - warnOnUncachedInput = false - this - } - - /** - * Train a K-means model on the given set of points; `data` should be cached for high - * performance, because this is an iterative algorithm. - */ - def run(data: RDD[Vector]): KMeansModel = { - - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { - logWarning("The input data is not directly cached, which may hurt performance if its" - + " parent RDDs are also uncached.") - } - - // Compute squared norms and cache them. - val norms = data.map(v => breezeNorm(v.toBreeze, 2.0)) - norms.persist() - val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) => - new BreezeVectorWithNorm(v, norm) - } - val model = runBreeze(breezeData) - norms.unpersist() - - // Warn at the end of the run as well, for increased visibility. - if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) { - logWarning("The input data was not directly cached, which may hurt performance if its" - + " parent RDDs are also uncached.") - } - model - } - - /** - * Implementation of K-Means using breeze. - */ - private def runBreeze(data: RDD[BreezeVectorWithNorm]): KMeansModel = { - - val sc = data.sparkContext - - val initStartTime = System.nanoTime() - - val centers = if (initializationMode == KMeans.RANDOM) { - initRandom(data) - } else { - initKMeansParallel(data) - } - - val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 - logInfo(s"Initialization with $initializationMode took " + "%.3f".format(initTimeInSeconds) + - " seconds.") - - val active = Array.fill(runs)(true) - val costs = Array.fill(runs)(0.0) - - var activeRuns = new ArrayBuffer[Int] ++ (0 until runs) - var iteration = 0 - - val iterationStartTime = System.nanoTime() - - // Execute iterations of Lloyd's algorithm until all runs have converged - while (iteration < maxIterations && !activeRuns.isEmpty) { - type WeightedPoint = (BV[Double], Long) - def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { - (p1._1 += p2._1, p1._2 + p2._2) - } - - val activeCenters = activeRuns.map(r => centers(r)).toArray - val costAccums = activeRuns.map(_ => sc.accumulator(0.0)) - - val bcActiveCenters = sc.broadcast(activeCenters) - - // Find the sum and count of points mapping to each center - val totalContribs = data.mapPartitions { points => - val thisActiveCenters = bcActiveCenters.value - val runs = thisActiveCenters.length - val k = thisActiveCenters(0).length - val dims = thisActiveCenters(0)(0).vector.length - - val sums = Array.fill(runs, k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]]) - val counts = Array.fill(runs, k)(0L) - - points.foreach { point => - (0 until runs).foreach { i => - val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) - costAccums(i) += cost - sums(i)(bestCenter) += point.vector - counts(i)(bestCenter) += 1 - } - } - - val contribs = for (i <- 0 until runs; j <- 0 until k) yield { - ((i, j), (sums(i)(j), counts(i)(j))) - } - contribs.iterator - }.reduceByKey(mergeContribs).collectAsMap() - - // Update the cluster centers and costs for each active run - for ((run, i) <- activeRuns.zipWithIndex) { - var changed = false - var j = 0 - while (j < k) { - val (sum, count) = totalContribs((i, j)) - if (count != 0) { - sum /= count.toDouble - val newCenter = new BreezeVectorWithNorm(sum) - if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { - changed = true - } - centers(run)(j) = newCenter - } - j += 1 - } - if (!changed) { - active(run) = false - logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") - } - costs(run) = costAccums(i).value - } - - activeRuns = activeRuns.filter(active(_)) - iteration += 1 - } - - val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 - logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + " seconds.") - - if (iteration == maxIterations) { - logInfo(s"KMeans reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"KMeans converged in $iteration iterations.") - } - - val (minCost, bestRun) = costs.zipWithIndex.min - - logInfo(s"The cost for the best run is $minCost.") - - new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector))) - } - - /** - * Initialize `runs` sets of cluster centers at random. - */ - private def initRandom(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq - Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => - new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm) - }.toArray) - } - - /** - * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. - * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries - * to find with dissimilar cluster centers by starting with a random center and then doing - * passes where more centers are chosen with probability proportional to their squared distance - * to the current cluster set. It results in a provable approximation to an optimal clustering. - * - * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. - */ - private def initKMeansParallel(data: RDD[BreezeVectorWithNorm]) - : Array[Array[BreezeVectorWithNorm]] = { - // Initialize each run's center to a random point - val seed = new XORShiftRandom().nextInt() - val sample = data.takeSample(true, runs, seed).toSeq - val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) - - // On each step, sample 2 * k points on average for each run with probability proportional - // to their squared distance from that run's current centers - var step = 0 - while (step < initializationSteps) { - val bcCenters = data.context.broadcast(centers) - val sumCosts = data.flatMap { point => - (0 until runs).map { r => - (r, KMeans.pointCost(bcCenters.value(r), point)) - } - }.reduceByKey(_ + _).collectAsMap() - val chosen = data.mapPartitionsWithIndex { (index, points) => - val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) - points.flatMap { p => - (0 until runs).filter { r => - rand.nextDouble() < 2.0 * KMeans.pointCost(bcCenters.value(r), p) * k / sumCosts(r) - }.map((_, p)) - } - }.collect() - chosen.foreach { case (r, p) => - centers(r) += p.toDense - } - step += 1 - } +import org.apache.spark.Logging - // Finally, we might have a set of more than k candidate centers for each run; weigh each - // candidate by the number of points in the dataset mapping to it and run a local k-means++ - // on the weighted centers to pick just k of them - val bcCenters = data.context.broadcast(centers) - val weightMap = data.flatMap { p => - (0 until runs).map { r => - ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) - } - }.reduceByKey(_ + _).collectAsMap() - val finalCenters = (0 until runs).map { r => - val myCenters = centers(r).toArray - val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray - LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) - } - finalCenters.toArray - } -} +import scala.reflect.ClassTag +import org.apache.spark.mllib.linalg.Vector -/** - * Top-level methods for calling K-means clustering. - */ -object KMeans { +object KMeans extends Logging { // Initialization mode names val RANDOM = "random" val K_MEANS_PARALLEL = "k-means||" - /** - * Trains a k-means model using the given set of parameters. - * - * @param data training points stored as `RDD[Array[Double]]` - * @param k number of clusters - * @param maxIterations max number of iterations - * @param runs number of parallel runs, defaults to 1. The best model is returned. - * @param initializationMode initialization model, either "random" or "k-means||" (default). - */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int, - initializationMode: String): KMeansModel = { - new KMeans().setK(k) - .setMaxIterations(maxIterations) - .setRuns(runs) - .setInitializationMode(initializationMode) - .run(data) - } + def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, mode: String): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs, mode)._2) /** * Trains a k-means model using specified parameters and the default values for unspecified. */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int): KMeansModel = { - train(data, k, maxIterations, 1, K_MEANS_PARALLEL) - } + def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations)._2) - /** - * Trains a k-means model using specified parameters and the default values for unspecified. - */ - def train( - data: RDD[Vector], - k: Int, - maxIterations: Int, - runs: Int): KMeansModel = { - train(data, k, maxIterations, runs, K_MEANS_PARALLEL) - } /** - * Returns the index of the closest center to the given point, as well as the squared distance. + * Trains a k-means model using specified parameters and the default values for unspecified. */ - private[mllib] def findClosest( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): (Int, Double) = { - var bestDistance = Double.PositiveInfinity - var bestIndex = 0 - var i = 0 - centers.foreach { center => - // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower bound to avoid unnecessary - // distance computation. - var lowerBoundOfSqDist = center.norm - point.norm - lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist - if (lowerBoundOfSqDist < bestDistance) { - val distance: Double = fastSquaredDistance(center, point) - if (distance < bestDistance) { - bestDistance = distance - bestIndex = i - } - } - i += 1 + def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel = + new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs)._2) + + + def doTrain[P <: FP, C <: FP](pointOps: PointOps[P, C])( + raw: RDD[Vector], + k: Int = 2, + maxIterations: Int = 20, + runs: Int = 1, + initializationMode: String = K_MEANS_PARALLEL, + initializationSteps: Int = 5, + epsilon: Double = 1e-4)(implicit ctag: ClassTag[C], ptag: ClassTag[P]) + : (Double, GeneralizedKMeansModel[P, C]) = { + + val initializer = if (initializationMode == RANDOM) { + new KMeansRandom(pointOps, k, runs) + } else { + new KMeansParallel(pointOps, k, runs, initializationSteps, 1) } - (bestIndex, bestDistance) - } - - /** - * Returns the K-means cost of a given point against the given cluster centers. - */ - private[mllib] def pointCost( - centers: TraversableOnce[BreezeVectorWithNorm], - point: BreezeVectorWithNorm): Double = - findClosest(centers, point)._2 - - /** - * Returns the squared Euclidean distance between two vectors computed by - * [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]]. - */ - private[clustering] def fastSquaredDistance( - v1: BreezeVectorWithNorm, - v2: BreezeVectorWithNorm): Double = { - MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm) + val data = (raw map { vals => pointOps.vectorToPoint(vals) }).cache() + val centers = initializer.init(data, 0) + new MultiKMeans(pointOps, maxIterations).cluster(data, centers) } } - -/** - * A breeze vector with its norm for fast distance computation. - * - * @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]] - */ -private[clustering] -class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable { - - def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0)) - - def this(array: Array[Double]) = this(new BDV[Double](array)) - - def this(v: Vector) = this(v.toBreeze) - - /** Converts the vector to a dense vector. */ - def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm) -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala new file mode 100644 index 0000000000000..6d8c0628b1fd5 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansInitializer.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.mllib.base.FP +import org.apache.spark.rdd.RDD + +private[mllib] trait KMeansInitializer[P <: FP, C <: FP] extends Serializable { + def init(data: RDD[P], seed: Int): Array[Array[C]] +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 12a3d91cd31a6..02199668fd3f5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -25,37 +25,28 @@ import org.apache.spark.mllib.linalg.Vector /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ -class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable { - /** Total number of clusters. */ - def k: Int = clusterCenters.length + +class KMeansModel(specific: GeneralizedKMeansModel[_,_]) { + + val k: Int = specific.k /** Returns the cluster index that a given point belongs to. */ - def predict(point: Vector): Int = { - KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1 - } + def predict(point: Vector): Int = specific.predict(point) + /** Maps given points to their cluster indices. */ - def predict(points: RDD[Vector]): RDD[Int] = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = points.context.broadcast(centersWithNorm) - points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))._1) - } + def predict(points: RDD[Vector]): RDD[Int] = specific.predict(points) + /** Maps given points to their cluster indices. */ - def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = - predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] + def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = specific.predict(points) /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. */ - def computeCost(data: RDD[Vector]): Double = { - val centersWithNorm = clusterCentersWithNorm - val bcCentersWithNorm = data.context.broadcast(centersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))).sum() - } - - private def clusterCentersWithNorm: Iterable[BreezeVectorWithNorm] = - clusterCenters.map(new BreezeVectorWithNorm(_)) + def computeCost(data: RDD[Vector]): Double = specific.computeCost(data) + + def clusterCenters: Array[Vector] = specific.clusterCenters } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala new file mode 100644 index 0000000000000..6a001a9ec2754 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.base.{ PointOps, FP, Zero } +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], + k: Int, + runs: Int, + initializationSteps: Int, + numPartitions: Int) + extends KMeansInitializer[P, C] with Logging { + + /** + * Initialize `runs` sets of cluster centers using the k-means|| algorithm by Bahmani et al. + * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of k-means++ that tries + * to find dissimilar cluster centers by starting with a random center and then doing + * passes where more centers are chosen with probability proportional to their squared distance + * to the current cluster set. It results in a provable approximation to an optimal clustering. + * + * The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf. + * + * @param data the RDD of points + * @param seed the random number generator seed + * @return + */ + def init(data: RDD[P], seed: Int): Array[Array[C]] = { + log.debug("k-means parallel on {} points" + data.count()) + + // randomly select one center per run, putting each into a separate array buffer + val sample = data.takeSample(true, runs, seed).toSeq.map(pointOps.pointToCenter) + val centers: Array[ArrayBuffer[C]] = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) + + // add at most 2k points per step + for (step <- 0 until initializationSteps) { + if (log.isInfoEnabled) showCenters(centers, step) + val centerArrays = centers.map { x: ArrayBuffer[C] => x.toArray } + val bcCenters = data.sparkContext.broadcast(centerArrays) + for ((r, p) <- choose(data, seed, step, bcCenters)) { + centers(r) += pointOps.pointToCenter(p) + } + bcCenters.unpersist() + } + + val bcCenters = data.sparkContext.broadcast(centers.map(_.toArray)) + val result = finalCenters(data, bcCenters, seed) + bcCenters.unpersist() + result + } + + def showCenters(centers: Array[ArrayBuffer[C]], step: Int) { + log.info("step {}", step) + for (run <- 0 until runs) { + log.info("final: run {} has {} centers", run, centers.length) + } + } + + /** + * Randomly choose at most 2 * k additional cluster centers by weighting them by their distance + * to the current closest cluster + * + * @param data the RDD of points + * @param seed random generator seed + * @param step which step of the selection process + * @return array of (run, point) + */ + def choose(data: RDD[P], seed: Int, step: Int, bcCenters: Broadcast[Array[Array[C]]]) + : Array[(Int, P)] = { + // compute the weighted distortion for each run + val sumCosts = data.flatMap { + point => + val centers = bcCenters.value + for (r <- 0 until runs) yield { + (r, point.weight * pointOps.pointCost(centers(r), point)) + } + }.reduceByKey(_ + _).collectAsMap() + + // choose points in proportion to ratio of weighted cost to weighted distortion + data.mapPartitionsWithIndex { + (index, points: Iterator[P]) => + val centers = bcCenters.value + val range = 0 until runs + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) + points.flatMap { p => + range.filter { r => + rand.nextDouble() < 2.0 * p.weight * pointOps.pointCost(centers(r), p) * k / sumCosts(r) + }.map((_, p)) + } + }.collect() + } + + /** + * Reduce sets of candidate cluster centers to at most k points per set using KMeansPlusPlus. + * Weight the points by the distance to the closest cluster center. + * + * @param data original points + * @param bcCenters array of sets of candidate centers + * @param seed random number seed + * @return array of sets of cluster centers + */ + def finalCenters(data: RDD[P], bcCenters: Broadcast[Array[Array[C]]], seed: Int) + : Array[Array[C]] = { + // for each (run, cluster) compute the sum of the weights of the points in the cluster + val weightMap = data.flatMap { + point => + val centers = bcCenters.value + for (r <- 0 until runs) yield { + ((r, pointOps.findClosest(centers(r), point)._1), point.weight) + } + }.reduceByKey(_ + _).collectAsMap() + + val centers = bcCenters.value + val kmeansPlusPlus = new KMeansPlusPlus(pointOps) + val trackingKmeans = new MultiKMeans(pointOps, 30) + val finalCenters = (0 until runs).map { + r => + val myCenters = centers(r).toArray + log.info("run {} has {} centers", r, myCenters.length) + val weights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), Zero)).toArray + // Check for the degenerate case that the number of centers available is less than k. + val kx = if (k > myCenters.length) myCenters.length else k + val sc = data.sparkContext + val initial = kmeansPlusPlus.getCenters(sc, seed, myCenters, weights, kx, numPartitions, 1) + trackingKmeans.cluster(data.sparkContext.parallelize(myCenters.map(pointOps.centerToPoint)), + Array(initial))._2.centers + } + finalCenters.toArray + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala new file mode 100644 index 0000000000000..dedd8032e2c7b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.mllib.base.{PointOps, FP, Infinity, One, Zero} +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{Logging, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** + * + * The KMeans++ initialization algorithm + * + * @param pointOps distance function + * @tparam P point type + * @tparam C center type + */ +private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( + pointOps: PointOps[P, C]) extends Serializable with Logging { + + /** + * We will maintain for each point the distance to its closest cluster center. + * Since only one center is added on each iteration, recomputing the closest cluster center + * only requires computing the distance to the new cluster center if + * that distance is less than the closest cluster center. + */ + case class FatPoint(location: P, index: Int, weight: Double, distance: Double) + + /** + * K-means++ on the weighted point set `points`. This first does the K-means++ + * initialization procedure and then rounds of Lloyd's algorithm. + */ + + def cluster( + sc: SparkContext, + seed: Int, + points: Array[C], + weights: Array[Double], + k: Int, + maxIterations: Int, + numPartitions: Int): Array[C] = { + val centers: Array[C] = getCenters(sc, seed, points, weights, k, numPartitions, 1) + val pts = sc.parallelize(points.map(pointOps.centerToPoint)) + new MultiKMeans(pointOps, maxIterations).cluster(pts, Array(centers))._2.centers + } + + /** + * Select centers in rounds. On each round, select 'perRound' centers, with probability of + * selection equal to the product of the given weights and distance to the closest cluster center + * of the previous round. + * + * @param sc the Spark context + * @param seed a random number seed + * @param points the candidate centers + * @param weights the weights on the candidate centers + * @param k the total number of centers to select + * @param numPartitions the number of data partitions to use + * @param perRound the number of centers to add per round + * @return an array of at most k cluster centers + */ + def getCenters(sc: SparkContext, seed: Int, points: Array[C], weights: Array[Double], k: Int, + numPartitions: Int, perRound: Int): Array[C] = { + assert(points.length > 0) + assert(k > 0) + assert(numPartitions > 0) + assert(perRound > 0) + + if (points.length < k) log.warn("number of clusters requested {} exceeds number of points {}", + k, points.length) + val centers = new ArrayBuffer[C](k) + val rand = new XORShiftRandom(seed) + centers += points(pickWeighted(rand, weights)) + log.info("starting kMeansPlusPlus initialization on {} points", points.length) + + var more = true + var fatPoints = initialFatPoints(points, weights) + fatPoints = updateDistances(fatPoints, centers.view.take(1)) + + while (centers.length < k && more) { + val chosen = choose(fatPoints, seed ^ (centers.length << 24), rand, perRound) + val newCenters = chosen.map(points(_)) + fatPoints = updateDistances(fatPoints, newCenters) + log.info("chose {} points", chosen.length) + for (index <- chosen) { + log.info(" center({}) = points({})", centers.length, index) + centers += points(index) + } + more = chosen.nonEmpty + } + val result = centers.take(k) + log.info("completed kMeansPlusPlus initialization with {} centers of {} requested", + result.length, k) + result.toArray + } + + /** + * Choose points + * + * @param fatPoints points to choose from + * @param seed random number seed + * @param rand random number generator + * @param count number of points to choose + * @return indices of chosen points + */ + def choose(fatPoints: Array[FatPoint], seed: Int, rand: XORShiftRandom, count: Int) = + (0 until count).flatMap { x => pickCenter(rand, fatPoints.iterator)}.map { _.index} + + /** + * Create initial fat points with weights given and infinite distance to closest cluster center. + * @param points points + * @param weights weights of points + * @return fat points with given weighs and infinite distance to closest cluster center + */ + def initialFatPoints(points: Array[C], weights: Array[Double]): Array[FatPoint] = + (0 until points.length).map{ i => FatPoint( pointOps.centerToPoint(points(i)), i, weights(i), + Infinity)}.toArray + + /** + * Update the distance of each point to its closest cluster center, given only the given cluster + * centers that were modified. + * + * @param points set of candidate initial cluster centers + * @param center new cluster center + * @return points with their distance to closest to cluster center updated + */ + + def updateDistances(points: Array[FatPoint], center: Seq[C]): Array[FatPoint] = + points.map { p => + var i = 0 + val to = center.length + var dist = p.distance + val point = p.location + while (i < to) { + dist = pointOps.distance(point, center(i), dist) + i = i + 1 + } + p.copy(distance=dist) + } + + /** + * Pick a point at random, weighing the choices by the given weight vector. + * Return -1 if all weights are 0.0 + * + * Checks for illegal weight vector and throws exception instead of returning -1 + * + * @param rand random number generator + * @param weights the weights of the points + * @return the index of the point chosen + */ + def pickWeighted(rand: XORShiftRandom, weights: Array[Double]): Int = { + val r = rand.nextDouble() * weights.sum + var i = 0 + var curWeight = 0.0 + while (i < weights.length && curWeight < r) { + assert(weights(i) >= 0.0) + curWeight += weights(i) + i += 1 + } + if (i == 0) throw new IllegalArgumentException("all weights are zero") + i - 1 + } + + /** + * + * Select point randomly with probability weighted by the product of the weight and the distance + * + * @param rand random number generator + * @return + */ + def pickCenter(rand: XORShiftRandom, fatPoints: Iterator[FatPoint]): Array[FatPoint] = { + var cumulative = Zero + val rangeAndIndexedPoints = fatPoints map { z => + val weightedDistance = z.weight * z.distance + val from = cumulative + cumulative = cumulative + weightedDistance + (from, cumulative, z.location, z.index) + } + val pts = rangeAndIndexedPoints.toArray + val total = pts.last._2 + val r = rand.nextDouble() * total + for (w <- pts if w._1 <= r && r < w._2) yield FatPoint(w._3, w._4, One, total) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala new file mode 100644 index 0000000000000..8d362c6f86ea8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.mllib.base.{FP, PointOps} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +import scala.collection.immutable.IndexedSeq +import scala.reflect.ClassTag + +/** + * Generate a random set of cluster centers from the points. + * + * Checks that no zero weight points are allowed to be cluster centers. + * This is important for some distance functions that require points with non-zero weights. + * When all weights are one (as is the case with the Euclidean distance function) this filter + * has no effect. + + * @param pointOps distance function + * @param k number of desired clusters + * @param runs number of sets of cluster centers to generate + */ + +private[mllib] class KMeansRandom[P <: FP : ClassTag, C <: FP : ClassTag]( + pointOps: PointOps[P,C], + k: Int, + runs: Int) + extends KMeansInitializer[P,C] { + + def init(data: RDD[P], seed: Int): Array[Array[C]] = { + + val filtered = data.filter(_.weight > 0) + val count = filtered.count() + if( runs * k <= count ) { + val x = filtered.takeSample(withReplacement=false, runs * k, new XORShiftRandom().nextInt()) + val centers = x.map(pointOps.pointToCenter).toSeq + Array.tabulate(runs)(r => centers.slice(r * k, (r + 1) * k).toArray) + } else if( k < count ) { + (0 to runs).toArray.map{ _ => {filtered.takeSample(withReplacement=false, k, + new XORShiftRandom().nextInt()).map(pointOps.pointToCenter)}} + } else { + (0 to runs).toArray.map { _ => filtered.collect().map(pointOps.pointToCenter) } + } + } + + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala deleted file mode 100644 index f0722d7c14a46..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.clustering - -import scala.util.Random - -import breeze.linalg.{Vector => BV, DenseVector => BDV, norm => breezeNorm} - -import org.apache.spark.Logging - -/** - * An utility object to run K-means locally. This is private to the ML package because it's used - * in the initialization of KMeans but not meant to be publicly exposed. - */ -private[mllib] object LocalKMeans extends Logging { - - /** - * Run K-means++ on the weighted point set `points`. This first does the K-means++ - * initialization procedure and then rounds of Lloyd's algorithm. - */ - def kMeansPlusPlus( - seed: Int, - points: Array[BreezeVectorWithNorm], - weights: Array[Double], - k: Int, - maxIterations: Int - ): Array[BreezeVectorWithNorm] = { - val rand = new Random(seed) - val dimensions = points(0).vector.length - val centers = new Array[BreezeVectorWithNorm](k) - - // Initialize centers by sampling using the k-means++ procedure. - centers(0) = pickWeighted(rand, points, weights).toDense - for (i <- 1 until k) { - // Pick the next center with a probability proportional to cost under current centers - val curCenters = centers.view.take(i) - val sum = points.view.zip(weights).map { case (p, w) => - w * KMeans.pointCost(curCenters, p) - }.sum - val r = rand.nextDouble() * sum - var cumulativeScore = 0.0 - var j = 0 - while (j < points.length && cumulativeScore < r) { - cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) - j += 1 - } - if (j == 0) { - logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + - s" Using duplicate point for center k = $i.") - centers(i) = points(0).toDense - } else { - centers(i) = points(j - 1).toDense - } - } - - // Run up to maxIterations iterations of Lloyd's algorithm - val oldClosest = Array.fill(points.length)(-1) - var iteration = 0 - var moved = true - while (moved && iteration < maxIterations) { - moved = false - val counts = Array.fill(k)(0.0) - val sums = Array.fill(k)( - BDV.zeros[Double](dimensions).asInstanceOf[BV[Double]] - ) - var i = 0 - while (i < points.length) { - val p = points(i) - val index = KMeans.findClosest(centers, p)._1 - breeze.linalg.axpy(weights(i), p.vector, sums(index)) - counts(index) += weights(i) - if (index != oldClosest(i)) { - moved = true - oldClosest(i) = index - } - i += 1 - } - // Update centers - var j = 0 - while (j < k) { - if (counts(j) == 0.0) { - // Assign center to a random point - centers(j) = points(rand.nextInt(points.length)).toDense - } else { - sums(j) /= counts(j) - centers(j) = new BreezeVectorWithNorm(sums(j)) - } - j += 1 - } - iteration += 1 - } - - if (iteration == maxIterations) { - logInfo(s"Local KMeans++ reached the max number of iterations: $maxIterations.") - } else { - logInfo(s"Local KMeans++ converged in $iteration iterations.") - } - - centers - } - - private def pickWeighted[T](rand: Random, data: Array[T], weights: Array[Double]): T = { - val r = rand.nextDouble() * weights.sum - var i = 0 - var curWeight = 0.0 - while (i < data.length && curWeight < r) { - curWeight += weights(i) - i += 1 - } - data(i - 1) - } -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala new file mode 100644 index 0000000000000..766ef05b286bb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.Accumulator +import org.apache.spark.SparkContext._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero} +import org.apache.spark.rdd.RDD + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + + +/** + * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously, + * returning the one with the lowest cost. + * + * We only compute if a center has moved if we need to. + * + * We use null to represent empty clusters instead of an Option type to save space. + * + * The resulting clustering may contain fewer than K clusters. + */ + +private[mllib] class MultiKMeans[P <: FP : ClassTag, C <: FP : ClassTag]( + pointOps: PointOps[P, C], maxIterations: Int) extends MultiKMeansClusterer[P, C] { + + def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = { + val runs = centers.length + val active = Array.fill(runs)(true) + val costs = Array.fill(runs)(Zero) + var activeRuns = new ArrayBuffer[Int] ++ (0 until runs) + var iteration = 0 + + /* + * Execute iterations of Lloyd's algorithm until all runs have converged. + */ + + while (iteration < maxIterations && activeRuns.nonEmpty) { + // remove the empty clusters + log.info("iteration {}", iteration) + + val activeCenters = activeRuns.map(r => centers(r)).toArray + + if (log.isInfoEnabled) { + for (r <- 0 until activeCenters.length) + log.info("run {} has {} centers", activeRuns(r), activeCenters(r).length) + } + + // Find the sum and count of points mapping to each center + val (centroids, runDistortion) = getCentroids(data, activeCenters) + + if (log.isInfoEnabled) { + for (run <- activeRuns) log.info("run {} distortion {}", run, runDistortion(run)) + } + + for (run <- activeRuns) active(run) = false + + for (((runIndex: Int, clusterIndex: Int), cn: Centroid) <- centroids) { + val run = activeRuns(runIndex) + if (cn.isEmpty) { + active(run) = true + centers(run)(clusterIndex) = null.asInstanceOf[C] + } else { + val centroid = pointOps.centroidToPoint(cn) + active(run) = active(run) || pointOps.centerMoved(centroid, centers(run)(clusterIndex)) + centers(run)(clusterIndex) = pointOps.pointToCenter(centroid) + } + } + + // filter out null centers + for (r <- activeRuns) centers(r) = centers(r).filter(_ != null) + + // update distortions and print log message if run completed during this iteration + for ((run, runIndex) <- activeRuns.zipWithIndex) { + costs(run) = runDistortion(runIndex) + if (!active(run)) log.info("run {} finished in {} iterations", run, iteration + 1) + } + activeRuns = activeRuns.filter(active(_)) + iteration += 1 + } + + val best = costs.zipWithIndex.min._2 + (costs(best), new GeneralizedKMeansModel(pointOps, centers(best))) + } + + def getCentroids(data: RDD[P], activeCenters: Array[Array[C]]) = { + val runDistortion = activeCenters.map(_ => data.sparkContext.accumulator(Zero)) + val bcActiveCenters = data.sparkContext.broadcast(activeCenters) + val ops = pointOps + val result = data.mapPartitions { points => + val bcCenters = bcActiveCenters.value + val centers = bcCenters.map { + _.map { _ => new Centroid} + } + for ( + point <- points; + (clusters: Array[C], run) <- bcCenters.zipWithIndex + ) { + val (cluster, cost) = ops.findClosest(clusters, point) + runDistortion(run) += cost + centers(run)(cluster).add(point) + } + + val contribution = + for ( + (clusters, run) <- bcCenters.zipWithIndex; + (contrib, cluster) <- clusters.zipWithIndex + ) yield { + ((run, cluster), centers(run)(cluster)) + } + + contribution.iterator + }.reduceByKey { (x, y) => x.add(y)}.collect() + bcActiveCenters.unpersist() + (result, runDistortion.map(x => x.localValue)) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala new file mode 100644 index 0000000000000..fab81a279f6ff --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import org.apache.spark.Logging +import org.apache.spark.mllib.base.FP +import org.apache.spark.rdd.RDD + + +private[mllib] trait MultiKMeansClusterer[P <: FP, C <: FP] extends Serializable with Logging { + def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala new file mode 100644 index 0000000000000..b850307b57e0f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.metrics + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} +import org.apache.spark.mllib.base.{Centroid, FPoint, PointOps, Infinity, Zero} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} + + +/** + * Euclidean distance measure + * + * This is the slow implementation of the squared Euclidean distance function, + * shown here simply for clarity. + */ +class EuclideanOps extends PointOps[FPoint, FPoint] with Serializable { + + type C = FPoint + type P = FPoint + + val epsilon = 1e-4 + + def distance(p: P, c: C, upperBound: Double = Infinity): Double = { + val d = p.inh.zip(c.inh).foldLeft(Zero) { + case (d: Double, (a: Double, b: Double)) => d + (a - b) * (a - b) + } + if( d < Zero) Zero else d + } + + def arrayToPoint(raw: Array[Double]) = new FPoint(BDV(raw), 1) + + def vectorToPoint(v: Vector) = { + v match { + case x: DenseVector => new FPoint(new BDV[Double](x.toArray), 1) + case x: SparseVector => new FPoint(new BSV[Double](x.indices, x.values, x.size), 1) + } + } + + def centerToPoint(v: C) = new P(v.raw, v.weight) + + def centroidToPoint(v: Centroid) = new P(v.raw, v.weight) + + def pointToCenter(v: P) = new C(v.raw, v.weight) + + def centroidToCenter(v: Centroid) = new C(v.raw, v.weight) + + def centerToVector(c: C) = new DenseVector(c.inh) + + def centerMoved(v: FPoint, w: FPoint): Boolean = distance(v, w) > epsilon * epsilon + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala new file mode 100644 index 0000000000000..138356ffa14bc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.metrics + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} +import org.apache.spark.mllib.base._ +import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector} + +import org.apache.spark.mllib.base.{Centroid, FPoint, PointOps, Infinity, Zero} + + + +class FastEUPoint(raw: BV[Double], weight: Double) extends FPoint(raw, weight) { + val norm = if (weight == Zero) Zero else raw.dot(raw) / (weight * weight) +} + +/** + * Euclidean distance measure, expedited by pre-computing vector norms + */ +class FastEuclideanOps extends PointOps[FastEUPoint, FastEUPoint] with Serializable { + + type C = FastEUPoint + type P = FastEUPoint + + val epsilon = 1e-4 + + /* compute a lower bound on the euclidean distance distance */ + + def distance(p: P, c: C, upperBound: Double): Double = { + val d = if (p.weight == Zero || c.weight == Zero) { + p.norm + c.norm + } else { + val x = p.raw.dot(c.raw) / (p.weight * c.weight) + p.norm + c.norm - 2.0 * x + } + if (d < upperBound) { + if (d < Zero) Zero else d + } else { + upperBound + } + } + + def arrayToPoint(raw: Array[Double]) = new P(new BDV[Double](raw), One) + + def vectorToPoint(v: Vector) = { + v match { + case x: DenseVector => new P(new BDV[Double](x.toArray), 1) + case x: SparseVector => new P(new BSV[Double](x.indices, x.values, x.size), 1) + } + } + + def centerToPoint(v: C) = new P(v.raw, v.weight) + + def centroidToPoint(v: Centroid) = new P(v.raw, v.weight) + + def pointToCenter(v: P) = new C(v.raw, v.weight) + + def centroidToCenter(v: Centroid) = new C(v.raw, v.weight) + + def centerToVector(c: C) = new DenseVector(c.inh) + + def centerMoved(v: P, w: C): Boolean = distance(v, w, Infinity) > epsilon * epsilon + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala new file mode 100644 index 0000000000000..8d801f4d1cc0a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib + +import org.apache.spark.mllib.linalg.Vector +import breeze.linalg.{Vector => BV} +import org.apache.spark.rdd.RDD + +package object base { + + val Zero = 0.0 + val One = 1.0 + val Infinity = Double.MaxValue + val Unknown = -1.0 + + private[mllib] trait FP extends Serializable { + val weight: Double + val raw: BV[Double] + } + + private[mllib] class FPoint(val raw: BV[Double], val weight: Double) extends FP { + override def toString: String = weight + "," + (raw.toArray mkString ",") + lazy val inh = (raw :* (1.0 / weight)).toArray + } + + /** + * A mutable point in homogeneous coordinates that is lazily initialized. + */ + private[mllib] class Centroid extends Serializable { + override def toString: String = weight + + (if(raw != null) ("," + raw.toArray mkString ",") else "") + + def isEmpty = weight == Zero + + var raw: BV[Double] = null + + var weight: Double = Zero + + def add(p: Centroid): this.type = add(p.raw, p.weight) + + def add(p: FP): this.type = add(p.raw, p.weight) + + def sub(p: Centroid): this.type = sub(p.raw, p.weight) + + def sub(p: FP): this.type = sub(p.raw, p.weight) + + def sub(r: BV[Double], w: Double): this.type = { + if (r != null) { + if (raw == null) { + raw = r.toVector :*= -1.0 + weight = w * -1 + } else { + raw -= r + weight = weight - w + } + } + this + } + + def add(r: BV[Double], w: Double) : this.type = { + if (r != null) { + if (raw == null) { + raw = r.toVector + weight = w + } else { + raw += r + weight = weight + w + } + } + this + } + } + + private[mllib] trait PointOps[P <: FP, C <: FP] { + + def distance(p: P, c: C, upperBound: Double): Double + + def arrayToPoint(v: Array[Double]): P + + def vectorToPoint(v: Vector): P + + def centerToPoint(v: C): P + + def pointToCenter(v: P): C + + def centroidToCenter(v: Centroid): C + + def centroidToPoint(v: Centroid): P + + def centerMoved(v: P, w: C): Boolean + + def centerToVector(c: C) : Vector + + /** + * Return the index of the closest point in `centers` to `point`, as well as its distance. + */ + def findClosest(centers: Array[C], point: P): (Int, Double) = { + var bestDistance = Infinity + var bestIndex = 0 + var i = 0 + val end = centers.length + while (i < end && bestDistance > 0.0) { + val d = distance(point, centers(i), bestDistance) + if (d < bestDistance) { + bestIndex = i + bestDistance = d + } + i = i + 1 + } + (bestIndex, bestDistance) + } + + def distortion(data: RDD[P], centers: Array[C]) = { + data.mapPartitions{ + points => Array(points.foldLeft(Zero){ + case (total, p) => total + findClosest(centers, p)._2}).iterator + }.reduce( _ + _ ) + } + + /** + * Return the K-means cost of a given point against the given cluster centers. + */ + def pointCost(centers: Array[C], point: P): Double = findClosest(centers, point)._2 + + } + +} diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java index 31676e64025d0..3cc1030710057 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaKMeansSuite.java @@ -76,15 +76,12 @@ public void runKMeansUsingConstructor() { Vector expectedCenter = Vectors.dense(1.0, 3.0, 4.0); JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + KMeansModel model = KMeans.train(data.rdd(), 1, 5); assertEquals(1, model.clusterCenters().length); assertEquals(expectedCenter, model.clusterCenters()[0]); - model = new KMeans() - .setK(1) - .setMaxIterations(1) - .setInitializationMode(KMeans.RANDOM()) - .run(data.rdd()); + model = KMeans.train(data.rdd(), 1, 1, 1, KMeans.RANDOM()); + assertEquals(expectedCenter, model.clusterCenters()[0]); } @@ -96,7 +93,7 @@ public void testPredictJavaRDD() { Vectors.dense(1.0, 4.0, 6.0) ); JavaRDD data = sc.parallelize(points, 2); - KMeansModel model = new KMeans().setK(1).setMaxIterations(5).run(data.rdd()); + KMeansModel model = KMeans.train(data.rdd(), 1, 5); JavaRDD predictions = model.predict(data); // Should be able to get the first prediction. predictions.first(); diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index afa1f79b95a12..e1b6d3589bf13 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -56,11 +56,11 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) model = KMeans.train( - data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL) + data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) } @@ -75,7 +75,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // Make sure code runs. var model = KMeans.train(data, k=2, maxIterations=1) - assert(model.clusterCenters.size === 2) + assert(model.clusterCenters.size === 1) } test("more clusters than points") { @@ -87,7 +87,7 @@ class KMeansSuite extends FunSuite with LocalSparkContext { // Make sure code runs. var model = KMeans.train(data, k=3, maxIterations=1) - assert(model.clusterCenters.size === 3) + assert(model.clusterCenters.size === 2) } test("single cluster with big dataset") { @@ -119,11 +119,10 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) } @@ -164,11 +163,10 @@ class KMeansSuite extends FunSuite with LocalSparkContext { model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = RANDOM) assert(model.clusterCenters.head ~== center absTol 1E-5) - model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, - initializationMode = K_MEANS_PARALLEL) + model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, mode = K_MEANS_PARALLEL) assert(model.clusterCenters.head ~== center absTol 1E-5) data.unpersist() @@ -255,4 +253,4 @@ class KMeansClusterSuite extends FunSuite with LocalClusterSparkContext { val cost = model.computeCost(points) } } -} +} \ No newline at end of file From c6030791406238430548634677a4f9d4c59aa93a Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Mon, 6 Oct 2014 17:12:21 -0700 Subject: [PATCH 2/7] Fixed formatting to conform with Spark style guide. --- .../clustering/GeneralizedKMeansModel.scala | 8 ++-- .../spark/mllib/clustering/KMeans.scala | 24 +++++----- .../spark/mllib/clustering/KMeansModel.scala | 26 ++++++---- .../mllib/clustering/KMeansParallel.scala | 18 ++++--- .../mllib/clustering/KMeansPlusPlus.scala | 47 +++++++++++-------- .../spark/mllib/clustering/KMeansRandom.scala | 33 +++++++------ .../spark/mllib/clustering/MultiKMeans.scala | 18 +++---- .../clustering/MultiKMeansClusterer.scala | 1 - .../clustering/metrics/EuclideanOps.scala | 8 ++-- .../clustering/metrics/FastEuclideanOps.scala | 11 ++--- .../spark/mllib/clustering/package.scala | 21 +++++---- 11 files changed, 114 insertions(+), 101 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala index 988ebda7aca31..97d740025ac7d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GeneralizedKMeansModel.scala @@ -19,32 +19,30 @@ package org.apache.spark.mllib.clustering import org.apache.spark.SparkContext._ import org.apache.spark.api.java.JavaRDD -import org.apache.spark.mllib.base.{PointOps, FP} +import org.apache.spark.mllib.base.{ PointOps, FP } import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD /** * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ -private[mllib] class GeneralizedKMeansModel[P<:FP, C<:FP]( +private[mllib] class GeneralizedKMeansModel[P <: FP, C <: FP]( val pointOps: PointOps[P, C], val centers: Array[C]) extends Serializable { val k: Int = clusterCenters.length - def clusterCenters : Array[Vector] = centers.map{ c => pointOps.centerToVector(c) } + def clusterCenters: Array[Vector] = centers.map { c => pointOps.centerToVector(c) } /** Returns the cluster index that a given point belongs to. */ def predict(point: Vector): Int = pointOps.findClosest(centers, pointOps.vectorToPoint(point))._1 - /** Maps given points to their cluster indices. */ def predict(points: RDD[Vector]): RDD[Int] = points.map(p => pointOps.findClosest(centers, pointOps.vectorToPoint(p))._1) - /** Maps given points to their cluster indices. */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = predict(points.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Integer]] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 762e2bd733cef..68c15d677d7c2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -17,23 +17,23 @@ package org.apache.spark.mllib.clustering +import scala.reflect.ClassTag -import org.apache.spark.mllib.base.{FP, PointOps} +import org.apache.spark.mllib.base.{ FP, PointOps } import org.apache.spark.mllib.clustering.metrics.FastEuclideanOps import org.apache.spark.rdd.RDD import org.apache.spark.Logging - - -import scala.reflect.ClassTag - import org.apache.spark.mllib.linalg.Vector -object KMeans extends Logging { +object KMeans extends Logging { // Initialization mode names val RANDOM = "random" val K_MEANS_PARALLEL = "k-means||" + /** + * Trains a k-means model using specified parameters and the default values for unspecified. + */ def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, mode: String): KMeansModel = new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs, mode)._2) @@ -43,30 +43,28 @@ object KMeans extends Logging { def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel = new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations)._2) - /** * Trains a k-means model using specified parameters and the default values for unspecified. */ - def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel = + def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel = new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs)._2) - - def doTrain[P <: FP, C <: FP](pointOps: PointOps[P, C])( + private def doTrain[P <: FP, C <: FP](pointOps: PointOps[P, C])( raw: RDD[Vector], k: Int = 2, maxIterations: Int = 20, runs: Int = 1, initializationMode: String = K_MEANS_PARALLEL, initializationSteps: Int = 5, - epsilon: Double = 1e-4)(implicit ctag: ClassTag[C], ptag: ClassTag[P]) - : (Double, GeneralizedKMeansModel[P, C]) = { + epsilon: Double = 1e-4 + )(implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { val initializer = if (initializationMode == RANDOM) { new KMeansRandom(pointOps, k, runs) } else { new KMeansParallel(pointOps, k, runs, initializationSteps, 1) } - val data = (raw map { vals => pointOps.vectorToPoint(vals) }).cache() + val data = (raw.map(vals => pointOps.vectorToPoint(vals))).cache() val centers = initializer.init(data, 0) new MultiKMeans(pointOps, maxIterations).cluster(data, centers) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 02199668fd3f5..d4c583483f877 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -26,27 +26,35 @@ import org.apache.spark.mllib.linalg.Vector * A clustering model for K-means. Each point belongs to the cluster with the closest center. */ - -class KMeansModel(specific: GeneralizedKMeansModel[_,_]) { +class KMeansModel(specific: GeneralizedKMeansModel[_, _]) { val k: Int = specific.k /** Returns the cluster index that a given point belongs to. */ def predict(point: Vector): Int = specific.predict(point) - - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + */ def predict(points: RDD[Vector]): RDD[Int] = specific.predict(points) - - /** Maps given points to their cluster indices. */ + /** + * Maps given points to their cluster indices. + * @param points input points + * @return the predicted cluster index for each input point + */ def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer] = specific.predict(points) /** - * Return the K-means cost (sum of squared distances of points to their nearest center) for this - * model on the given data. + * Get the K-means cost for this model on the given data. + * @param data data for which cost is to be computed + * @return the K-means cost for this model on the given data */ def computeCost(data: RDD[Vector]): Double = specific.computeCost(data) - def clusterCenters: Array[Vector] = specific.clusterCenters + /** + * Get the array of cluster centers + * @return the array of cluster centers + */ + def clusterCenters: Array[Vector] = specific.clusterCenters } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala index 6a001a9ec2754..7fd9f1dbd54fe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansParallel.scala @@ -17,6 +17,9 @@ package org.apache.spark.mllib.clustering +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.broadcast.Broadcast @@ -24,9 +27,6 @@ import org.apache.spark.mllib.base.{ PointOps, FP, Zero } import org.apache.spark.rdd.RDD import org.apache.spark.util.random.XORShiftRandom -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag - private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag]( pointOps: PointOps[P, C], k: Int, @@ -88,8 +88,11 @@ private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag]( * @param step which step of the selection process * @return array of (run, point) */ - def choose(data: RDD[P], seed: Int, step: Int, bcCenters: Broadcast[Array[Array[C]]]) - : Array[(Int, P)] = { + def choose( + data: RDD[P], + seed: Int, + step: Int, + bcCenters: Broadcast[Array[Array[C]]]): Array[(Int, P)] = { // compute the weighted distortion for each run val sumCosts = data.flatMap { point => @@ -122,8 +125,9 @@ private[mllib] class KMeansParallel[P <: FP: ClassTag, C <: FP: ClassTag]( * @param seed random number seed * @return array of sets of cluster centers */ - def finalCenters(data: RDD[P], bcCenters: Broadcast[Array[Array[C]]], seed: Int) - : Array[Array[C]] = { + def finalCenters( + data: RDD[P], + bcCenters: Broadcast[Array[Array[C]]], seed: Int): Array[Array[C]] = { // for each (run, cluster) compute the sum of the weights of the points in the cluster val weightMap = data.flatMap { point => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala index dedd8032e2c7b..cd85f642d7a0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansPlusPlus.scala @@ -17,13 +17,13 @@ package org.apache.spark.mllib.clustering -import org.apache.spark.mllib.base.{PointOps, FP, Infinity, One, Zero} -import org.apache.spark.util.random.XORShiftRandom -import org.apache.spark.{Logging, SparkContext} - import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import org.apache.spark.mllib.base.{ PointOps, FP, Infinity, One, Zero } +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{ Logging, SparkContext } + /** * * The KMeans++ initialization algorithm @@ -32,14 +32,14 @@ import scala.reflect.ClassTag * @tparam P point type * @tparam C center type */ -private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( +private[mllib] class KMeansPlusPlus[P <: FP: ClassTag, C <: FP: ClassTag]( pointOps: PointOps[P, C]) extends Serializable with Logging { /** * We will maintain for each point the distance to its closest cluster center. * Since only one center is added on each iteration, recomputing the closest cluster center * only requires computing the distance to the new cluster center if - * that distance is less than the closest cluster center. + * that distance is less than the closest cluster center. */ case class FatPoint(location: P, index: Int, weight: Double, distance: Double) @@ -49,15 +49,15 @@ private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( */ def cluster( - sc: SparkContext, - seed: Int, - points: Array[C], - weights: Array[Double], - k: Int, - maxIterations: Int, - numPartitions: Int): Array[C] = { + sc: SparkContext, + seed: Int, + points: Array[C], + weights: Array[Double], + k: Int, + maxIterations: Int, + numPartitions: Int): Array[C] = { val centers: Array[C] = getCenters(sc, seed, points, weights, k, numPartitions, 1) - val pts = sc.parallelize(points.map(pointOps.centerToPoint)) + val pts = sc.parallelize(points.map(pointOps.centerToPoint)) new MultiKMeans(pointOps, maxIterations).cluster(pts, Array(centers))._2.centers } @@ -75,8 +75,13 @@ private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( * @param perRound the number of centers to add per round * @return an array of at most k cluster centers */ - def getCenters(sc: SparkContext, seed: Int, points: Array[C], weights: Array[Double], k: Int, - numPartitions: Int, perRound: Int): Array[C] = { + def getCenters( + sc: SparkContext, + seed: Int, points: Array[C], + weights: Array[Double], + k: Int, + numPartitions: Int, + perRound: Int): Array[C] = { assert(points.length > 0) assert(k > 0) assert(numPartitions > 0) @@ -120,7 +125,7 @@ private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( * @return indices of chosen points */ def choose(fatPoints: Array[FatPoint], seed: Int, rand: XORShiftRandom, count: Int) = - (0 until count).flatMap { x => pickCenter(rand, fatPoints.iterator)}.map { _.index} + (0 until count).flatMap { x => pickCenter(rand, fatPoints.iterator) }.map { _.index } /** * Create initial fat points with weights given and infinite distance to closest cluster center. @@ -129,8 +134,10 @@ private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( * @return fat points with given weighs and infinite distance to closest cluster center */ def initialFatPoints(points: Array[C], weights: Array[Double]): Array[FatPoint] = - (0 until points.length).map{ i => FatPoint( pointOps.centerToPoint(points(i)), i, weights(i), - Infinity)}.toArray + (0 until points.length).map { i => + FatPoint(pointOps.centerToPoint(points(i)), i, weights(i), + Infinity) + }.toArray /** * Update the distance of each point to its closest cluster center, given only the given cluster @@ -151,7 +158,7 @@ private[mllib] class KMeansPlusPlus[P <: FP : ClassTag, C <: FP : ClassTag]( dist = pointOps.distance(point, center(i), dist) i = i + 1 } - p.copy(distance=dist) + p.copy(distance = dist) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala index 8d362c6f86ea8..ff0ab0b623536 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansRandom.scala @@ -17,12 +17,13 @@ package org.apache.spark.mllib.clustering -import org.apache.spark.mllib.base.{FP, PointOps} +import scala.reflect.ClassTag + +import org.apache.spark.mllib.base.{ FP, PointOps } import org.apache.spark.rdd.RDD import org.apache.spark.util.random.XORShiftRandom -import scala.collection.immutable.IndexedSeq -import scala.reflect.ClassTag + /** * Generate a random set of cluster centers from the points. @@ -31,34 +32,36 @@ import scala.reflect.ClassTag * This is important for some distance functions that require points with non-zero weights. * When all weights are one (as is the case with the Euclidean distance function) this filter * has no effect. - + * * @param pointOps distance function * @param k number of desired clusters * @param runs number of sets of cluster centers to generate */ -private[mllib] class KMeansRandom[P <: FP : ClassTag, C <: FP : ClassTag]( - pointOps: PointOps[P,C], +private[mllib] class KMeansRandom[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], k: Int, - runs: Int) - extends KMeansInitializer[P,C] { + runs: Int) extends KMeansInitializer[P, C] { def init(data: RDD[P], seed: Int): Array[Array[C]] = { val filtered = data.filter(_.weight > 0) val count = filtered.count() - if( runs * k <= count ) { - val x = filtered.takeSample(withReplacement=false, runs * k, new XORShiftRandom().nextInt()) + if (runs * k <= count) { + val x = filtered.takeSample(withReplacement = false, runs * k, new XORShiftRandom().nextInt()) val centers = x.map(pointOps.pointToCenter).toSeq Array.tabulate(runs)(r => centers.slice(r * k, (r + 1) * k).toArray) - } else if( k < count ) { - (0 to runs).toArray.map{ _ => {filtered.takeSample(withReplacement=false, k, - new XORShiftRandom().nextInt()).map(pointOps.pointToCenter)}} + } else if (k < count) { + (0 to runs).toArray.map { _ => + { + filtered.takeSample(withReplacement = false, k, + new XORShiftRandom().nextInt()).map(pointOps.pointToCenter) + } + } } else { - (0 to runs).toArray.map { _ => filtered.collect().map(pointOps.pointToCenter) } + (0 to runs).toArray.map(_ => filtered.collect().map(pointOps.pointToCenter)) } } - } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala index 766ef05b286bb..42cc10955ef78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala @@ -17,16 +17,13 @@ package org.apache.spark.mllib.clustering -import org.apache.spark.Accumulator import org.apache.spark.SparkContext._ -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.mllib.base.{Centroid, FP, PointOps, Zero} +import org.apache.spark.mllib.base.{ Centroid, FP, PointOps, Zero } import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag - /** * A K-Means clustering implementation that performs multiple K-means clusterings simultaneously, * returning the one with the lowest cost. @@ -38,8 +35,9 @@ import scala.reflect.ClassTag * The resulting clustering may contain fewer than K clusters. */ -private[mllib] class MultiKMeans[P <: FP : ClassTag, C <: FP : ClassTag]( - pointOps: PointOps[P, C], maxIterations: Int) extends MultiKMeansClusterer[P, C] { +private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP: ClassTag]( + pointOps: PointOps[P, C], + maxIterations: Int) extends MultiKMeansClusterer[P, C] { def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) = { val runs = centers.length @@ -104,11 +102,9 @@ private[mllib] class MultiKMeans[P <: FP : ClassTag, C <: FP : ClassTag]( val runDistortion = activeCenters.map(_ => data.sparkContext.accumulator(Zero)) val bcActiveCenters = data.sparkContext.broadcast(activeCenters) val ops = pointOps - val result = data.mapPartitions { points => + val result = data.mapPartitions { points => val bcCenters = bcActiveCenters.value - val centers = bcCenters.map { - _.map { _ => new Centroid} - } + val centers = bcCenters.map(x => Array.fill(x.length)(new Centroid())) for ( point <- points; (clusters: Array[C], run) <- bcCenters.zipWithIndex @@ -127,7 +123,7 @@ private[mllib] class MultiKMeans[P <: FP : ClassTag, C <: FP : ClassTag]( } contribution.iterator - }.reduceByKey { (x, y) => x.add(y)}.collect() + }.reduceByKey { (x, y) => x.add(y) }.collect() bcActiveCenters.unpersist() (result, runDistortion.map(x => x.localValue)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala index fab81a279f6ff..c5a5b237cab22 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala @@ -24,5 +24,4 @@ import org.apache.spark.rdd.RDD private[mllib] trait MultiKMeansClusterer[P <: FP, C <: FP] extends Serializable with Logging { def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) - } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala index b850307b57e0f..22e627dcf07b4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/EuclideanOps.scala @@ -17,10 +17,10 @@ package org.apache.spark.mllib.clustering.metrics -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} -import org.apache.spark.mllib.base.{Centroid, FPoint, PointOps, Infinity, Zero} -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector} +import breeze.linalg.{ DenseVector => BDV, SparseVector => BSV, Vector => BV } +import org.apache.spark.mllib.base.{ Centroid, FPoint, PointOps, Infinity, Zero } +import org.apache.spark.mllib.linalg.{ DenseVector, SparseVector, Vector } /** * Euclidean distance measure @@ -39,7 +39,7 @@ class EuclideanOps extends PointOps[FPoint, FPoint] with Serializable { val d = p.inh.zip(c.inh).foldLeft(Zero) { case (d: Double, (a: Double, b: Double)) => d + (a - b) * (a - b) } - if( d < Zero) Zero else d + if (d < Zero) Zero else d } def arrayToPoint(raw: Array[Double]) = new FPoint(BDV(raw), 1) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala index 138356ffa14bc..e5cd923eea217 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/metrics/FastEuclideanOps.scala @@ -17,13 +17,11 @@ package org.apache.spark.mllib.clustering.metrics -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} -import org.apache.spark.mllib.base._ -import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector} - -import org.apache.spark.mllib.base.{Centroid, FPoint, PointOps, Infinity, Zero} - +import breeze.linalg.{ DenseVector => BDV, SparseVector => BSV, Vector => BV } +import org.apache.spark.mllib.base._ +import org.apache.spark.mllib.linalg.{ SparseVector, DenseVector, Vector } +import org.apache.spark.mllib.base.{ Centroid, FPoint, PointOps, Infinity, Zero } class FastEUPoint(raw: BV[Double], weight: Double) extends FPoint(raw, weight) { val norm = if (weight == Zero) Zero else raw.dot(raw) / (weight * weight) @@ -77,4 +75,3 @@ class FastEuclideanOps extends PointOps[FastEUPoint, FastEUPoint] with Serializa def centerMoved(v: P, w: C): Boolean = distance(v, w, Infinity) > epsilon * epsilon } - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala index 8d801f4d1cc0a..472782b14122a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/package.scala @@ -17,8 +17,9 @@ package org.apache.spark.mllib +import breeze.linalg.{ Vector => BV } + import org.apache.spark.mllib.linalg.Vector -import breeze.linalg.{Vector => BV} import org.apache.spark.rdd.RDD package object base { @@ -35,7 +36,7 @@ package object base { private[mllib] class FPoint(val raw: BV[Double], val weight: Double) extends FP { override def toString: String = weight + "," + (raw.toArray mkString ",") - lazy val inh = (raw :* (1.0 / weight)).toArray + lazy val inh = (raw :* (1.0 / weight)).toArray } /** @@ -43,7 +44,7 @@ package object base { */ private[mllib] class Centroid extends Serializable { override def toString: String = weight + - (if(raw != null) ("," + raw.toArray mkString ",") else "") + (if (raw != null) ("," + raw.toArray mkString ",") else "") def isEmpty = weight == Zero @@ -72,7 +73,7 @@ package object base { this } - def add(r: BV[Double], w: Double) : this.type = { + def add(r: BV[Double], w: Double): this.type = { if (r != null) { if (raw == null) { raw = r.toVector @@ -104,7 +105,7 @@ package object base { def centerMoved(v: P, w: C): Boolean - def centerToVector(c: C) : Vector + def centerToVector(c: C): Vector /** * Return the index of the closest point in `centers` to `point`, as well as its distance. @@ -126,10 +127,12 @@ package object base { } def distortion(data: RDD[P], centers: Array[C]) = { - data.mapPartitions{ - points => Array(points.foldLeft(Zero){ - case (total, p) => total + findClosest(centers, p)._2}).iterator - }.reduce( _ + _ ) + data.mapPartitions { + points => + Array(points.foldLeft(Zero) { + case (total, p) => total + findClosest(centers, p)._2 + }).iterator + }.reduce(_ + _) } /** From f2158d6deae8ccad3da2c28101c047b18ec4b433 Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Mon, 6 Oct 2014 18:53:21 -0700 Subject: [PATCH 3/7] Restored KMeans class and setter methods. --- .../spark/mllib/clustering/KMeans.scala | 81 ++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 68c15d677d7c2..0a2279879191a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import org.apache.spark.annotation.Experimental + import scala.reflect.ClassTag import org.apache.spark.mllib.base.{ FP, PointOps } @@ -25,6 +27,82 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Logging import org.apache.spark.mllib.linalg.Vector +class KMeans private ( + private var k: Int, + private var maxIterations: Int, + private var runs: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double) extends Serializable with Logging { + + def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) + + /** Set the number of clusters to create (k). Default: 2. */ + def setK(k: Int): this.type = { + this.k = k + this + } + + /** Set maximum number of iterations to run. Default: 20. */ + def setMaxIterations(maxIterations: Int): this.type = { + this.maxIterations = maxIterations + this + } + + /** + * Set the initialization algorithm. This can be either "random" to choose random points as + * initial cluster centers, or "k-means||" to use a parallel variant of k-means++ + * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||. + */ + def setInitializationMode(initializationMode: String): this.type = { + if (initializationMode != KMeans.RANDOM && initializationMode != KMeans.K_MEANS_PARALLEL) { + throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode) + } + this.initializationMode = initializationMode + this + } + + /** + * :: Experimental :: + * Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm + * this many times with random starting conditions (configured by the initialization mode), then + * return the best clustering found over any run. Default: 1. + */ + @Experimental + def setRuns(runs: Int): this.type = { + if (runs <= 0) { + throw new IllegalArgumentException("Number of runs must be positive") + } + this.runs = runs + this + } + + /** + * Set the number of steps for the k-means|| initialization mode. This is an advanced + * setting -- the default of 5 is almost always enough. Default: 5. + */ + def setInitializationSteps(initializationSteps: Int): this.type = { + if (initializationSteps <= 0) { + throw new IllegalArgumentException("Number of initialization steps must be positive") + } + this.initializationSteps = initializationSteps + this + } + + /** + * Set the distance threshold within which we've consider centers to have converged. + * If all centers move less than this Euclidean distance, we stop iterating one run. + */ + def setEpsilon(epsilon: Double): this.type = { + this.epsilon = epsilon + this + } + + def run(data: RDD[Vector]): KMeansModel = { + new KMeansModel(KMeans.doTrain(new FastEuclideanOps)(data, k, maxIterations, runs, + initializationMode, initializationSteps)._2) + } +} object KMeans extends Logging { // Initialization mode names @@ -56,8 +134,7 @@ object KMeans extends Logging { runs: Int = 1, initializationMode: String = K_MEANS_PARALLEL, initializationSteps: Int = 5, - epsilon: Double = 1e-4 - )(implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { + epsilon: Double = 1e-4)(implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { val initializer = if (initializationMode == RANDOM) { new KMeansRandom(pointOps, k, runs) From 54c8a4b8652809d1f2b383998884ca2583ca907b Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Mon, 6 Oct 2014 18:59:27 -0700 Subject: [PATCH 4/7] Spark coding formatting. --- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0a2279879191a..59f9d1d9b9cf5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -127,7 +127,8 @@ object KMeans extends Logging { def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): KMeansModel = new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, runs)._2) - private def doTrain[P <: FP, C <: FP](pointOps: PointOps[P, C])( + private def doTrain[P <: FP, C <: FP]( + pointOps: PointOps[P, C])( raw: RDD[Vector], k: Int = 2, maxIterations: Int = 20, From 8b85bbd4080e9be6d8b027ef64ade8d8b554e9fe Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Mon, 6 Oct 2014 19:00:07 -0700 Subject: [PATCH 5/7] Spark coding formatting. --- .../main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 59f9d1d9b9cf5..e9e8c67e7c12f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -135,7 +135,8 @@ object KMeans extends Logging { runs: Int = 1, initializationMode: String = K_MEANS_PARALLEL, initializationSteps: Int = 5, - epsilon: Double = 1e-4)(implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { + epsilon: Double = 1e-4)( + implicit ctag: ClassTag[C], ptag: ClassTag[P]): (Double, GeneralizedKMeansModel[P, C]) = { val initializer = if (initializationMode == RANDOM) { new KMeansRandom(pointOps, k, runs) From e2b6c02f0f6ed715c7c72a1bd2db217a93deba3c Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Mon, 6 Oct 2014 19:04:03 -0700 Subject: [PATCH 6/7] Added documentation. --- .../spark/mllib/clustering/MultiKMeansClusterer.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala index c5a5b237cab22..14fb800e05e27 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeansClusterer.scala @@ -23,5 +23,11 @@ import org.apache.spark.rdd.RDD private[mllib] trait MultiKMeansClusterer[P <: FP, C <: FP] extends Serializable with Logging { + /** + * Cluster the data + * @param data a data set to be clustered + * @param centers a set of sets of initial centers to use for clustering + * @return the best clustering of the data from the initial sets of cluster centers + */ def cluster(data: RDD[P], centers: Array[Array[C]]): (Double, GeneralizedKMeansModel[P, C]) } From 35da8e9e188e669460000d5799d061ecc3ca150f Mon Sep 17 00:00:00 2001 From: Derrick Burns Date: Thu, 9 Oct 2014 09:44:54 -0700 Subject: [PATCH 7/7] Use logInfo instead of log.info. --- .../org/apache/spark/mllib/clustering/MultiKMeans.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala index 42cc10955ef78..827e5229b7a07 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/MultiKMeans.scala @@ -51,21 +51,20 @@ private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP: ClassTag]( */ while (iteration < maxIterations && activeRuns.nonEmpty) { - // remove the empty clusters - log.info("iteration {}", iteration) + logInfo(s"iteration $iteration") val activeCenters = activeRuns.map(r => centers(r)).toArray if (log.isInfoEnabled) { for (r <- 0 until activeCenters.length) - log.info("run {} has {} centers", activeRuns(r), activeCenters(r).length) + logInfo(s"run ${activeRuns(r)} has ${activeCenters(r).length} centers") } // Find the sum and count of points mapping to each center val (centroids, runDistortion) = getCentroids(data, activeCenters) if (log.isInfoEnabled) { - for (run <- activeRuns) log.info("run {} distortion {}", run, runDistortion(run)) + for (run <- activeRuns) logInfo(s"run $run distortion ${runDistortion(run)}") } for (run <- activeRuns) active(run) = false @@ -88,7 +87,7 @@ private[mllib] class MultiKMeans[P <: FP: ClassTag, C <: FP: ClassTag]( // update distortions and print log message if run completed during this iteration for ((run, runIndex) <- activeRuns.zipWithIndex) { costs(run) = runDistortion(runIndex) - if (!active(run)) log.info("run {} finished in {} iterations", run, iteration + 1) + if (!active(run)) logInfo(s"run $run finished in ${iteration + 1} iterations") } activeRuns = activeRuns.filter(active(_)) iteration += 1