From b93350fce951d47e50fafda5bf066d5b29fe9803 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 28 Aug 2014 16:32:05 -0400 Subject: [PATCH 01/21] Streaming KMeans with decay - Used trainOn and predictOn pattern, similar to StreamingLinearAlgorithm - Decay factor can be set explicitly, or via fractional decay parameters expressed in units of number of batches, or number of points - Unit tests for basic functionality and decay settings --- .../mllib/clustering/StreamingKMeans.scala | 143 +++++++++++++ .../clustering/StreamingKMeansSuite.scala | 194 ++++++++++++++++++ 2 files changed, 337 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala new file mode 100644 index 0000000000000..69291fb26ed7a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -0,0 +1,143 @@ +package org.apache.spark.mllib.clustering + +import breeze.linalg.{Vector => BV} + +import scala.reflect.ClassTag +import scala.util.Random._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.StreamingContext._ + +@DeveloperApi +class StreamingKMeansModel( + override val clusterCenters: Array[Vector], + val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) { + + /** do a sequential KMeans update on a batch of data **/ + def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { + + val centers = clusterCenters + val counts = clusterCounts + + // find nearest cluster to each point + val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong))) + + // get sums and counts for updating each cluster + type WeightedPoint = (BV[Double], Long) + def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { + (p1._1 += p2._1, p1._2 + p2._2) + } + val pointStats: Array[(Int, (BV[Double], Long))] = + closest.reduceByKey{mergeContribs}.collectAsMap().toArray + + // implement update rule + for (newP <- pointStats) { + // store old count and centroid + val oldCount = counts(newP._1) + val oldCentroid = centers(newP._1).toBreeze + // get new count and centroid + val newCount = newP._2._2 + val newCentroid = newP._2._1 / newCount.toDouble + // compute the normalized scale factor that controls forgetting + val decayFactor = units match { + case "batches" => newCount / (a * oldCount + newCount) + case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount) + } + // perform the update + val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor + // store the new counts and centers + counts(newP._1) = oldCount + newCount + centers(newP._1) = Vectors.fromBreeze(updatedCentroid) + } + + new StreamingKMeansModel(centers, counts) + } + +} + +@DeveloperApi +class StreamingKMeans( + var k: Int, + var a: Double, + var units: String) extends Logging { + + protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) + + def this() = this(2, 1.0, "batches") + + def setK(k: Int): this.type = { + this.k = k + this + } + + def setDecayFactor(a: Double): this.type = { + this.a = a + this + } + + def setUnits(units: String): this.type = { + this.units = units + this + } + + def setDecayFractionBatches(q: Double): this.type = { + this.a = math.log(1 - q) / math.log(0.5) + this.units = "batches" + this + } + + def setDecayFractionPoints(q: Double, m: Double): this.type = { + this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m) + this.units = "points" + this + } + + def setInitialCenters(initialCenters: Array[Vector]): this.type = { + val clusterCounts = Array.fill(this.k)(0).map(_.toLong) + this.model = new StreamingKMeansModel(initialCenters, clusterCounts) + this + } + + def setRandomCenters(d: Int): this.type = { + val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray + val clusterCounts = Array.fill(0)(d).map(_.toLong) + this.model = new StreamingKMeansModel(initialCenters, clusterCounts) + this + } + + def latestModel(): StreamingKMeansModel = { + model + } + + def trainOn(data: DStream[Vector]) { + this.isInitialized + data.foreachRDD { (rdd, time) => + model = model.update(rdd, this.a, this.units) + } + } + + def predictOn(data: DStream[Vector]): DStream[Int] = { + this.isInitialized + data.map(model.predict) + } + + def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { + this.isInitialized + data.mapValues(model.predict) + } + + def isInitialized: Boolean = { + if (Option(model.clusterCenters) == None) { + logError("Initial cluster centers must be set before starting predictions") + throw new IllegalArgumentException + } else { + true + } + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala new file mode 100644 index 0000000000000..a930bba0c5e19 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -0,0 +1,194 @@ +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.TestSuiteBase + +class StreamingKMeansSuite extends FunSuite with TestSuiteBase { + + override def maxWaitTimeMillis = 30000 + + test("accuracy for single center and equivalence to grand average") { + + // set parameters + val numBatches = 10 + val numPoints = 50 + val k = 1 + val d = 5 + val r = 0.1 + + // create model with one cluster + val model = new StreamingKMeans() + .setK(1) + .setDecayFactor(1.0) + .setInitialCenters(Array(Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0))) + + // generate random data for kmeans + val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42) + + // setup and run the model training + val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // estimated center should be close to true center + assert(centers(0) ~== model.latestModel().clusterCenters(0) absTol 1E-1) + + // estimated center from streaming should exactly match the arithmetic mean of all data points + val grandMean = input.flatten.map(x => x.toBreeze).reduce(_+_) / (numBatches * numPoints).toDouble + assert(model.latestModel().clusterCenters(0) ~== Vectors.dense(grandMean.toArray) absTol 1E-5) + + } + + test("accuracy for two centers") { + + val numBatches = 10 + val numPoints = 5 + val k = 2 + val d = 5 + val r = 0.1 + + // create model with two clusters + val model = new StreamingKMeans() + .setK(2) + .setDecayFactor(1.0) + .setInitialCenters(Array(Vectors.dense(-0.1, 0.1, -0.2, -0.3, -0.1), + Vectors.dense(0.1, -0.2, 0.0, 0.2, 0.1))) + + // generate random data for kmeans + val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42) + + // setup and run the model training + val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // check that estimated centers are close to true centers + // NOTE this depends on the initialization! allow for binary flip + assert(centers(0) ~== model.latestModel().clusterCenters(0) absTol 1E-1) + assert(centers(1) ~== model.latestModel().clusterCenters(1) absTol 1E-1) + + } + + test("drifting with fractional decay in units of batches") { + + val numBatches1 = 50 + val numBatches2 = 50 + val numPoints = 1 + val q = 0.25 + val k = 1 + val d = 1 + val r = 2.0 + + // create model with two clusters + val model = new StreamingKMeans() + .setK(1) + .setDecayFractionBatches(q) + .setInitialCenters(Array(Vectors.dense(0.0))) + + // create two batches of data with different, pre-specified centers + // to simulate a transition from one cluster to another + val (input1, centers1) = StreamingKMeansDataGenerator( + numPoints, numBatches1, k, d, r, 42, initCenters = Array(Vectors.dense(100.0))) + val (input2, centers2) = StreamingKMeansDataGenerator( + numPoints, numBatches2, k, d, r, 84, initCenters = Array(Vectors.dense(0.0))) + + // store the history + val history = new ArrayBuffer[Double](numBatches1 + numBatches2) + + // setup and run the model training + val ssc = setupStreams(input1 ++ input2, (inputDStream: DStream[Vector]) => { + model.trainOn(inputDStream) + // extract the center (in this case one-dimensional) + inputDStream.foreachRDD(x => history.append(model.latestModel().clusterCenters(0)(0))) + inputDStream.count() + }) + runStreams(ssc, numBatches1 + numBatches2, numBatches1 + numBatches2) + + // check that the fraction of batches required to reach 50 + // equals the setting of q, by finding the index of the first batch + // below 50 and comparing to total number of batches received + val halvedIndex = history.zipWithIndex.filter( x => x._1 < 50)(0)._2.toDouble + val fraction = (halvedIndex - numBatches1.toDouble) / halvedIndex + assert(fraction ~== q absTol 1E-1) + + } + + test("drifting with fractional decay in units of points") { + + val numBatches1 = 50 + val numBatches2 = 50 + val numPoints = 10 + val q = 0.25 + val k = 1 + val d = 1 + val r = 2.0 + + // create model with two clusters + val model = new StreamingKMeans() + .setK(1) + .setDecayFractionPoints(q, numPoints) + .setInitialCenters(Array(Vectors.dense(0.0))) + + // create two batches of data with different, pre-specified centers + // to simulate a transition from one cluster to another + val (input1, centers1) = StreamingKMeansDataGenerator( + numPoints, numBatches1, k, d, r, 42, initCenters = Array(Vectors.dense(100.0))) + val (input2, centers2) = StreamingKMeansDataGenerator( + numPoints, numBatches2, k, d, r, 84, initCenters = Array(Vectors.dense(0.0))) + + // store the history + val history = new ArrayBuffer[Double](numBatches1 + numBatches2) + + // setup and run the model training + val ssc = setupStreams(input1 ++ input2, (inputDStream: DStream[Vector]) => { + model.trainOn(inputDStream) + // extract the center (in this case one-dimensional) + inputDStream.foreachRDD(x => history.append(model.latestModel().clusterCenters(0)(0))) + inputDStream.count() + }) + runStreams(ssc, numBatches1 + numBatches2, numBatches1 + numBatches2) + + // check that the fraction of batches required to reach 50 + // equals the setting of q, by finding the index of the first batch + // below 50 and comparing to total number of batches received + val halvedIndex = history.zipWithIndex.filter( x => x._1 < 50)(0)._2.toDouble + val fraction = (halvedIndex - numBatches1.toDouble) / halvedIndex + assert(fraction ~== q absTol 1E-1) + + } + + def StreamingKMeansDataGenerator( + numPoints: Int, + numBatches: Int, + k: Int, + d: Int, + r: Double, + seed: Int, + initCenters: Array[Vector] = null): (IndexedSeq[IndexedSeq[Vector]], Array[Vector]) = { + val rand = new Random(seed) + val centers = initCenters match { + case null => Array.fill(k)(Vectors.dense(Array.fill(d)(rand.nextGaussian()))) + case _ => initCenters + } + val data = (0 until numBatches).map { i => + (0 until numPoints).map { idx => + val center = centers(idx % k) + Vectors.dense(Array.tabulate(d)(x => center(x) + rand.nextGaussian() * r)) + } + } + (data, centers) + } + + +} From b5b5f8d41dab067c0ed5b5b9de88d7613dda84ef Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 02:17:56 -0400 Subject: [PATCH 02/21] Add better documentation --- .../mllib/clustering/StreamingKMeans.scala | 89 ++++++++++++++++++- .../clustering/StreamingKMeansSuite.scala | 20 ++++- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 69291fb26ed7a..d1953a12dbe90 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.clustering import breeze.linalg.{Vector => BV} @@ -13,12 +30,45 @@ import org.apache.spark.SparkContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext._ +/** + * :: DeveloperApi :: + * + * StreamingKMeansModel extends MLlib's KMeansModel for streaming + * algorithms, so it can keep track of the number of points assigned + * to each cluster, and also update the model by doing a single iteration + * of the standard KMeans algorithm. + * + * The update algorithm uses the "mini-batch" KMeans rule, + * generalized to incorporate forgetfullness (i.e. decay). + * The basic update rule (for each cluster) is: + * + * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t] + * n_t+t = n_t + m_t + * + * Where c_t is the previously estimated centroid for that cluster, + * n_t is the number of points assigned to it thus far, x_t is the centroid + * estimated on the current batch, and m_t is the number of points assigned + * to that centroid in the current batch. + * + * This update rule is modified with a decay factor 'a' that scales + * the contribution of the clusters as estimated thus far. + * If a=1, all batches are weighted equally. If a=0, new centroids + * are determined entirely by recent data. Lower values correspond to + * more forgetting. + * + * Decay can optionally be specified as a decay fraction 'q', + * which corresponds to the fraction of batches (or points) + * after which the past will be reduced to a contribution of 0.5. + * This decay fraction can be specified in units of 'points' or 'batches'. + * if 'batches', behavior will be independent of the number of points per batch; + * if 'points', the expected number of points per batch must be specified. + */ @DeveloperApi class StreamingKMeansModel( override val clusterCenters: Array[Vector], val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) { - /** do a sequential KMeans update on a batch of data **/ + // do a sequential KMeans update on a batch of data def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { val centers = clusterCenters @@ -70,39 +120,49 @@ class StreamingKMeans( def this() = this(2, 1.0, "batches") + /** Set the number of clusters. */ def setK(k: Int): this.type = { this.k = k this } + /** Set the decay factor directly (for forgetful algorithms). */ def setDecayFactor(a: Double): this.type = { this.a = a this } + /** Set the decay units for forgetful algorithms ("batches" or "points"). */ def setUnits(units: String): this.type = { + if (units != "batches" && units != "points") { + throw new IllegalArgumentException("Invalid units for decay: " + units) + } this.units = units this } + /** Set decay fraction in units of batches. */ def setDecayFractionBatches(q: Double): this.type = { this.a = math.log(1 - q) / math.log(0.5) this.units = "batches" this } + /** Set decay fraction in units of points. Must specify expected number of points per batch. */ def setDecayFractionPoints(q: Double, m: Double): this.type = { this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m) this.units = "points" this } + /** Specify initial explicitly directly. */ def setInitialCenters(initialCenters: Array[Vector]): this.type = { val clusterCounts = Array.fill(this.k)(0).map(_.toLong) this.model = new StreamingKMeansModel(initialCenters, clusterCounts) this } + /** Initialize random centers, requiring only the number of dimensions. */ def setRandomCenters(d: Int): this.type = { val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray val clusterCounts = Array.fill(0)(d).map(_.toLong) @@ -110,10 +170,19 @@ class StreamingKMeans( this } + /** Return the latest model. */ def latestModel(): StreamingKMeansModel = { model } + /** + * Update the clustering model by training on batches of data from a DStream. + * This operation registers a DStream for training the model, + * checks whether the cluster centers have been initialized, + * and updates the model using each batch of data from the stream. + * + * @param data DStream containing vector data + */ def trainOn(data: DStream[Vector]) { this.isInitialized data.foreachRDD { (rdd, time) => @@ -121,16 +190,34 @@ class StreamingKMeans( } } + /** + * Use the clustering model to make predictions on batches of data from a DStream. + * + * @param data DStream containing vector data + * @return DStream containing predictions + */ def predictOn(data: DStream[Vector]): DStream[Int] = { this.isInitialized data.map(model.predict) } + /** + * Use the model to make predictions on the values of a DStream and carry over its keys. + * + * @param data DStream containing (key, feature vector) pairs + * @tparam K key type + * @return DStream containing the input keys and the predictions as values + */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { this.isInitialized data.mapValues(model.predict) } + /** + * Check whether cluster centers have been initialized. + * + * @return Boolean, True if cluster centrs have been initialized + */ def isInitialized: Boolean = { if (Option(model.clusterCenters) == None) { logError("Initial cluster centers must be set before starting predictions") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala index a930bba0c5e19..5c23b04961df2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer @@ -43,6 +60,7 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { assert(centers(0) ~== model.latestModel().clusterCenters(0) absTol 1E-1) // estimated center from streaming should exactly match the arithmetic mean of all data points + // because the decay factor is set to 1.0 val grandMean = input.flatten.map(x => x.toBreeze).reduce(_+_) / (numBatches * numPoints).toDouble assert(model.latestModel().clusterCenters(0) ~== Vectors.dense(grandMean.toArray) absTol 1E-5) @@ -74,7 +92,7 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { runStreams(ssc, numBatches, numBatches) // check that estimated centers are close to true centers - // NOTE this depends on the initialization! allow for binary flip + // NOTE exact assignment depends on the initialization! assert(centers(0) ~== model.latestModel().clusterCenters(0) absTol 1E-1) assert(centers(1) ~== model.latestModel().clusterCenters(1) absTol 1E-1) From f33684b2e59593a71c577e48c4ab1356444c84d6 Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:03:35 -0400 Subject: [PATCH 03/21] Add explanation and example to docs --- docs/mllib-clustering.md | 74 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 7978e934fb36b..d0aa1a8bb0678 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -34,7 +34,7 @@ a given dataset, the algorithm returns the best clustering result). * *initializationSteps* determines the number of steps in the k-means\|\| algorithm. * *epsilon* determines the distance threshold within which we consider k-means to have converged. -## Examples +### Examples
@@ -153,3 +153,75 @@ provided in the [Self-Contained Applications](quick-start.html#self-contained-ap section of the Spark Quick Start guide. Be sure to also include *spark-mllib* to your build file as a dependency. + +## Streaming clustering + +When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using: + +`\begin{equation} + c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} +\end{equation}` +`\begin{equation} + n_{t+1} = n_t + m_t +\end{equation}` + +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average. + +### Examples + +This example shows how to estimate clusters on streaming data. + +
+ +
+ +First we import the neccessary classes. + +{% highlight scala %} + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.clustering.StreamingKMeans + +{% endhighlight %} + +Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data. + +{% highlight scala %} + +val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) +val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse) + +{% endhighlight %} + +We create a model with random clusters and specify the number of clusters to find + +{% highlight scala %} + +val numDimensions = 3 +val numClusters = 2 +val model = new StreamingKMeans() + .setK(numClusters) + .setDecayFactor(1.0) + .setRandomWeights(numDimensions) + +{% endhighlight %} + +Now register the streams for training and testing and start the job, printing the predicted cluster assignments on new data points as they arrive. + +{% highlight scala %} + +model.trainOn(trainingData) +model.predictOn(testData).print() + +ssc.start() +ssc.awaitTermination() + +{% endhighlight %} + +As you add new text files with data the cluster centers will update. Each data point should be formatted as `[x1, x2, x3]`. Anytime a text file is placed in `/training/data/dir` +the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. With new data, the cluster centers will change. + + +
+ +
From 5db7074cab7663cc88feeda8f61212ade48ca9a0 Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:03:51 -0400 Subject: [PATCH 04/21] Example usage for StreamingKMeans --- .../examples/mllib/StreamingKMeans.scala | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala new file mode 100644 index 0000000000000..8dc410be8d86b --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.clustering.StreamingKMeans +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * Estimate clusters on one stream of data and make predictions + * on another stream, where the data streams arrive as text files + * into two different directories. + * + * The rows of the text files must be vector data in the form + * `[x1,x2,x3,...,xn]` + * Where n is the number of dimensions. n must be the same for train and test. + * + * Usage: StreamingKmeans + * + * To run on your local machine using the two directories `trainingDir` and `testDir`, + * with updates every 5 seconds, 2 dimensions per data point, and 3 clusters, call: + * $ bin/run-example \ + * org.apache.spark.examples.mllib.StreamingKMeans trainingDir testDir 5 3 2 + * + * As you add text files to `trainingDir` the clusters will continuously update. + * Anytime you add text files to `testDir`, you'll see predicted labels using the current model. + * + */ +object StreamingKMeans { + + def main(args: Array[String]) { + + if (args.length != 5) { + System.err.println( + "Usage: StreamingKMeans " + + " ") + System.exit(1) + } + + val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression") + val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) + + val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) + val testData = ssc.textFileStream(args(1)).map(Vectors.parse) + + val model = new StreamingKMeans() + .setK(args(3).toInt) + .setDecayFactor(1.0) + .setRandomCenters(args(4).toInt) + + model.trainOn(trainingData) + model.predictOn(testData).print() + + ssc.start() + ssc.awaitTermination() + + } + +} From 9facbe3ecbc14679b83053fa5f471dd50ab68fbd Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:04:11 -0400 Subject: [PATCH 05/21] Bug fix --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index d1953a12dbe90..f3f528d356549 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -165,7 +165,7 @@ class StreamingKMeans( /** Initialize random centers, requiring only the number of dimensions. */ def setRandomCenters(d: Int): this.type = { val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray - val clusterCounts = Array.fill(0)(d).map(_.toLong) + val clusterCounts = Array.fill(this.k)(0).map(_.toLong) this.model = new StreamingKMeansModel(initialCenters, clusterCounts) this } From ea9877c242b06ba690e6237f095137efc2f76faa Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:04:24 -0400 Subject: [PATCH 06/21] More documentation --- .../spark/mllib/clustering/StreamingKMeans.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index f3f528d356549..92564eb0f535b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -62,6 +62,16 @@ import org.apache.spark.streaming.StreamingContext._ * This decay fraction can be specified in units of 'points' or 'batches'. * if 'batches', behavior will be independent of the number of points per batch; * if 'points', the expected number of points per batch must be specified. + * + * Use a builder pattern to construct a streaming KMeans analysis + * in an application, like: + * + * val model = new StreamingKMeans() + * .setDecayFactor(0.5) + * .setK(3) + * .setRandomCenters(5) + * .trainOn(DStream) + * */ @DeveloperApi class StreamingKMeansModel( From 2086bdc56a29f63a1c2143f88303e1296df45260 Mon Sep 17 00:00:00 2001 From: freeman Date: Sat, 25 Oct 2014 04:04:31 -0400 Subject: [PATCH 07/21] Log cluster center updates --- .../spark/mllib/clustering/StreamingKMeans.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 92564eb0f535b..706b4f1d187dc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -76,7 +76,7 @@ import org.apache.spark.streaming.StreamingContext._ @DeveloperApi class StreamingKMeansModel( override val clusterCenters: Array[Vector], - val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) { + val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { // do a sequential KMeans update on a batch of data def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { @@ -113,8 +113,14 @@ class StreamingKMeansModel( // store the new counts and centers counts(newP._1) = oldCount + newCount centers(newP._1) = Vectors.fromBreeze(updatedCentroid) - } + // display the updated cluster centers + val display = centers(newP._1).size match { + case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...") + case _ => centers(newP._1).toArray.mkString("[", ",", "]") + } + logInfo("Cluster %d updated: %s ".format (newP._1, display)) + } new StreamingKMeansModel(centers, counts) } From ea22ec8d218a814dd8ab8d214cf167f4ea15a468 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:12:06 -0700 Subject: [PATCH 08/21] Fix imports --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 706b4f1d187dc..147ecc1b1a054 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -17,10 +17,9 @@ package org.apache.spark.mllib.clustering -import breeze.linalg.{Vector => BV} - import scala.reflect.ClassTag -import scala.util.Random._ + +import breeze.linalg.{Vector => BV} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.Logging @@ -29,6 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.Utils /** * :: DeveloperApi :: From 1472ec5a6e9ff90367359860924ee81ece459561 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:12:56 -0700 Subject: [PATCH 09/21] Doc formatting --- .../mllib/clustering/StreamingKMeans.scala | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 147ecc1b1a054..9c898aa7c0664 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -32,11 +32,10 @@ import org.apache.spark.util.Utils /** * :: DeveloperApi :: - * * StreamingKMeansModel extends MLlib's KMeansModel for streaming * algorithms, so it can keep track of the number of points assigned * to each cluster, and also update the model by doing a single iteration - * of the standard KMeans algorithm. + * of the standard k-means algorithm. * * The update algorithm uses the "mini-batch" KMeans rule, * generalized to incorporate forgetfullness (i.e. decay). @@ -63,22 +62,13 @@ import org.apache.spark.util.Utils * if 'batches', behavior will be independent of the number of points per batch; * if 'points', the expected number of points per batch must be specified. * - * Use a builder pattern to construct a streaming KMeans analysis - * in an application, like: - * - * val model = new StreamingKMeans() - * .setDecayFactor(0.5) - * .setK(3) - * .setRandomCenters(5) - * .trainOn(DStream) - * */ @DeveloperApi class StreamingKMeansModel( override val clusterCenters: Array[Vector], val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { - // do a sequential KMeans update on a batch of data + /** Perform a k-means update on a batch of data. */ def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { val centers = clusterCenters @@ -125,7 +115,22 @@ class StreamingKMeansModel( } } - +/** + * :: DeveloperApi :: + * StreamingKMeans provides methods for configuring a + * streaming k-means analysis, training the model on streaming, + * and using the model to make predictions on streaming data. + * See KMeansModel for details on algorithm and update rules. + * + * Use a builder pattern to construct a streaming k-means analysis + * in an application, like: + * + * val model = new StreamingKMeans() + * .setDecayFactor(0.5) + * .setK(3) + * .setRandomCenters(5) + * .trainOn(DStream) + */ @DeveloperApi class StreamingKMeans( var k: Int, @@ -171,7 +176,7 @@ class StreamingKMeans( this } - /** Specify initial explicitly directly. */ + /** Specify initial centers directly. */ def setInitialCenters(initialCenters: Array[Vector]): this.type = { val clusterCounts = Array.fill(this.k)(0).map(_.toLong) this.model = new StreamingKMeansModel(initialCenters, clusterCounts) From a4a316b2a38359b335f94b4574b4139302cee300 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:13:06 -0700 Subject: [PATCH 10/21] Use collect --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 9c898aa7c0664..0b07b70fa1101 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -83,7 +83,7 @@ class StreamingKMeansModel( (p1._1 += p2._1, p1._2 + p2._2) } val pointStats: Array[(Int, (BV[Double], Long))] = - closest.reduceByKey{mergeContribs}.collectAsMap().toArray + closest.reduceByKey(mergeContribs).collect() // implement update rule for (newP <- pointStats) { From 2899623d17b5eca0da121e20e324b48a15121548 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:13:27 -0700 Subject: [PATCH 11/21] Use pattern matching for clarity --- .../mllib/clustering/StreamingKMeans.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 0b07b70fa1101..9bf360c8fa27b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -86,13 +86,13 @@ class StreamingKMeansModel( closest.reduceByKey(mergeContribs).collect() // implement update rule - for (newP <- pointStats) { + pointStats.foreach { case (label, (mean, count)) => // store old count and centroid - val oldCount = counts(newP._1) - val oldCentroid = centers(newP._1).toBreeze + val oldCount = counts(label) + val oldCentroid = centers(label).toBreeze // get new count and centroid - val newCount = newP._2._2 - val newCentroid = newP._2._1 / newCount.toDouble + val newCount = count + val newCentroid = mean / newCount.toDouble // compute the normalized scale factor that controls forgetting val decayFactor = units match { case "batches" => newCount / (a * oldCount + newCount) @@ -101,15 +101,15 @@ class StreamingKMeansModel( // perform the update val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor // store the new counts and centers - counts(newP._1) = oldCount + newCount - centers(newP._1) = Vectors.fromBreeze(updatedCentroid) + counts(label) = oldCount + newCount + centers(label) = Vectors.fromBreeze(updatedCentroid) // display the updated cluster centers - val display = centers(newP._1).size match { - case x if x > 100 => centers(newP._1).toArray.take(100).mkString("[", ",", "...") - case _ => centers(newP._1).toArray.mkString("[", ",", "]") + val display = centers(label).size match { + case x if x > 100 => centers(label).toArray.take(100).mkString("[", ",", "...") + case _ => centers(label).toArray.mkString("[", ",", "]") } - logInfo("Cluster %d updated: %s ".format (newP._1, display)) + logInfo("Cluster %d updated: %s ".format (label, display)) } new StreamingKMeansModel(centers, counts) } From c7050d556724111670b9c6e524eaf4c3c6c0155d Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:13:36 -0700 Subject: [PATCH 12/21] Fix spacing --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 9bf360c8fa27b..03d6b26fad0ec 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -133,9 +133,9 @@ class StreamingKMeansModel( */ @DeveloperApi class StreamingKMeans( - var k: Int, - var a: Double, - var units: String) extends Logging { + var k: Int, + var a: Double, + var units: String) extends Logging { protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) From 44050a92a4b72b86e459f688f53bffa73e0e6be5 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:13:58 -0700 Subject: [PATCH 13/21] Simpler constructor --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 03d6b26fad0ec..1ee4ea7b796e2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -178,7 +178,7 @@ class StreamingKMeans( /** Specify initial centers directly. */ def setInitialCenters(initialCenters: Array[Vector]): this.type = { - val clusterCounts = Array.fill(this.k)(0).map(_.toLong) + val clusterCounts = new Array[Long](this.k) this.model = new StreamingKMeansModel(initialCenters, clusterCounts) this } From 9cfc30166204a53d6c0656a3e5db3444c13674d5 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:14:15 -0700 Subject: [PATCH 14/21] Make random seed an argument --- .../mllib/clustering/StreamingKMeans.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 1ee4ea7b796e2..1cb2b55a7fc8e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -183,10 +183,19 @@ class StreamingKMeans( this } - /** Initialize random centers, requiring only the number of dimensions. */ - def setRandomCenters(d: Int): this.type = { - val initialCenters = (0 until k).map(_ => Vectors.dense(Array.fill(d)(nextGaussian()))).toArray - val clusterCounts = Array.fill(this.k)(0).map(_.toLong) + /** Initialize random centers, requiring only the number of dimensions. + * + * @param dim Number of dimensions + * @param seed Random seed + * */ + def setRandomCenters(dim: Int, seed: Long = Utils.random.nextLong): this.type = { + + val random = Utils.random + random.setSeed(seed) + + val initialCenters = (0 until k) + .map(_ => Vectors.dense(Array.fill(dim)(random.nextGaussian()))).toArray + val clusterCounts = new Array[Long](this.k) this.model = new StreamingKMeansModel(initialCenters, clusterCounts) this } From 77dbd3fe9ec025868f4a89936afca3b76f53fbf8 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:14:28 -0700 Subject: [PATCH 15/21] Make initialization check an assertion --- .../mllib/clustering/StreamingKMeans.scala | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 1cb2b55a7fc8e..2520b89ac0569 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -214,7 +214,7 @@ class StreamingKMeans( * @param data DStream containing vector data */ def trainOn(data: DStream[Vector]) { - this.isInitialized + this.assertInitialized() data.foreachRDD { (rdd, time) => model = model.update(rdd, this.a, this.units) } @@ -227,7 +227,7 @@ class StreamingKMeans( * @return DStream containing predictions */ def predictOn(data: DStream[Vector]): DStream[Int] = { - this.isInitialized + this.assertInitialized() data.map(model.predict) } @@ -239,21 +239,14 @@ class StreamingKMeans( * @return DStream containing the input keys and the predictions as values */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { - this.isInitialized + this.assertInitialized() data.mapValues(model.predict) } - /** - * Check whether cluster centers have been initialized. - * - * @return Boolean, True if cluster centrs have been initialized - */ - def isInitialized: Boolean = { + /** Check whether cluster centers have been initialized.*/ + def assertInitialized(): Unit = { if (Option(model.clusterCenters) == None) { - logError("Initial cluster centers must be set before starting predictions") - throw new IllegalArgumentException - } else { - true + throw new IllegalStateException("Initial cluster centers must be set before starting predictions") } } From ad9bdc2dc6a774d4f83a66366ae7a0f581c625f3 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:15:06 -0700 Subject: [PATCH 16/21] Use labeled points and predictOnValues in examples --- docs/mllib-clustering.md | 5 +++-- .../spark/examples/mllib/StreamingKMeans.scala | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index d0aa1a8bb0678..2428d84ec5e03 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -180,6 +180,7 @@ First we import the neccessary classes. {% highlight scala %} import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.clustering.StreamingKMeans {% endhighlight %} @@ -189,7 +190,7 @@ Then we make an input stream of vectors for training, as well as one for testing {% highlight scala %} val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) -val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse) +val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) {% endhighlight %} @@ -211,7 +212,7 @@ Now register the streams for training and testing and start the job, printing th {% highlight scala %} model.trainOn(trainingData) -model.predictOn(testData).print() +model.predictOnValues(testData).print() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala index 8dc410be8d86b..79416ae734c52 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala @@ -18,6 +18,7 @@ package org.apache.spark.examples.mllib import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -27,9 +28,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} * on another stream, where the data streams arrive as text files * into two different directories. * - * The rows of the text files must be vector data in the form + * The rows of the training text files must be vector data in the form * `[x1,x2,x3,...,xn]` - * Where n is the number of dimensions. n must be the same for train and test. + * Where n is the number of dimensions. + * + * The rows of the test text files must be labeled data in the form + * `(y,[x1,x2,x3,...,xn])` + * Where y is some identifier. n must be the same for train and test. * * Usage: StreamingKmeans * @@ -57,7 +62,7 @@ object StreamingKMeans { val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) - val testData = ssc.textFileStream(args(1)).map(Vectors.parse) + val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val model = new StreamingKMeans() .setK(args(3).toInt) @@ -65,7 +70,7 @@ object StreamingKMeans { .setRandomCenters(args(4).toInt) model.trainOn(trainingData) - model.predictOn(testData).print() + model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() From 374a706dd9e4ab064a2003d9440b91ac930e86c4 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:15:21 -0700 Subject: [PATCH 17/21] Formatting --- docs/mllib-clustering.md | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 2428d84ec5e03..d0b258b1678b6 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -156,7 +156,11 @@ a dependency. ## Streaming clustering -When data arrive in a stream, we may want to estimate clusters dynamically, updating them as new data arrive. MLlib provides support for streaming KMeans clustering, with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm uses a generalization of the mini-batch KMeans update rule. For each batch of data, we assign all points to their nearest cluster, compute new cluster centers, then update each cluster using: +When data arrive in a stream, we may want to estimate clusters dynamically, +updating them as new data arrive. MLlib provides support for streaming k-means clustering, +with parameters to control the decay (or "forgetfulness") of the estimates. The algorithm +uses a generalization of the mini-batch k-means update rule. For each batch of data, we assign +all points to their nearest cluster, compute new cluster centers, then update each cluster using: `\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} @@ -165,7 +169,12 @@ When data arrive in a stream, we may want to estimate clusters dynamically, upda n_{t+1} = n_t + m_t \end{equation}` -Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an expontentially-weighted moving average. +Where `$c_t$` is the previous center for the cluster, `$n_t$` is the number of points assigned +to the cluster thus far, `$x_t$` is the new cluster center from the current batch, and `$m_t$` +is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` +can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; +with `$\alpha$=0` only the most recent data will be used. This is analogous to an +exponentially-weighted moving average. ### Examples @@ -185,7 +194,9 @@ import org.apache.spark.mllib.clustering.StreamingKMeans {% endhighlight %} -Then we make an input stream of vectors for training, as well as one for testing. We assume a StreamingContext `ssc` has been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. For this example, we use vector data. +Then we make an input stream of vectors for training, as well as a stream of labeled data +points for testing. We assume a StreamingContext `ssc` has been created, see +[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. {% highlight scala %} @@ -201,13 +212,14 @@ We create a model with random clusters and specify the number of clusters to fin val numDimensions = 3 val numClusters = 2 val model = new StreamingKMeans() - .setK(numClusters) - .setDecayFactor(1.0) - .setRandomWeights(numDimensions) + .setK(numClusters) + .setDecayFactor(1.0) + .setRandomWeights(numDimensions) {% endhighlight %} -Now register the streams for training and testing and start the job, printing the predicted cluster assignments on new data points as they arrive. +Now register the streams for training and testing and start the job, printing +the predicted cluster assignments on new data points as they arrive. {% highlight scala %} @@ -219,9 +231,12 @@ ssc.awaitTermination() {% endhighlight %} -As you add new text files with data the cluster centers will update. Each data point should be formatted as `[x1, x2, x3]`. Anytime a text file is placed in `/training/data/dir` -the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. With new data, the cluster centers will change. - +As you add new text files with data the cluster centers will update. Each training +point should be formatted as `[x1, x2, x3]`, and each test data point +should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier +(e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir` +the model will update. Anytime a text file is placed in `/testing/data/dir` +you will see predictions. With new data, the cluster centers will change!
From 9f7aea9eac3c64f646d1783909e0e2d155663399 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 28 Oct 2014 22:24:39 -0700 Subject: [PATCH 18/21] Style fixes --- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 2520b89ac0569..3c8a84c3db104 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -243,10 +243,11 @@ class StreamingKMeans( data.mapValues(model.predict) } - /** Check whether cluster centers have been initialized.*/ + /** Check whether cluster centers have been initialized. */ def assertInitialized(): Unit = { if (Option(model.clusterCenters) == None) { - throw new IllegalStateException("Initial cluster centers must be set before starting predictions") + throw new IllegalStateException( + "Initial cluster centers must be set before starting predictions") } } From 0411bf563bf2296d3a56b1a60bb5e4e1f2789981 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 31 Oct 2014 02:03:17 -0700 Subject: [PATCH 19/21] Change decay parameterization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use a single halfLife parameter that now determines the decay factor directly - Allow specification of timeUnit for the halfLife as “batches” or “points” - Documentation adjusted accordingly --- docs/mllib-clustering.md | 8 +- .../mllib/clustering/StreamingKMeans.scala | 68 ++++++-------- .../clustering/StreamingKMeansSuite.scala | 89 ------------------- 3 files changed, 35 insertions(+), 130 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index d0b258b1678b6..f9bb0d9989de7 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -174,7 +174,13 @@ to the cluster thus far, `$x_t$` is the new cluster center from the current batc is the number of points added to the cluster in the current batch. The decay factor `$\alpha$` can be used to ignore the past: with `$\alpha$=1` all data will be used from the beginning; with `$\alpha$=0` only the most recent data will be used. This is analogous to an -exponentially-weighted moving average. +exponentially-weighted moving average. + +The decay can be specified using a `halfLife` parameter, which determines the +correct decay factor `a` such that, for data acquired +at time `t`, its contribution by time `t + halfLife` will have dropped to 0.5. +The unit of time can be specified either as `batches` or `points` and the update rule +will be adjusted accordingly. ### Examples diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 3c8a84c3db104..3a6451118ca5e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -39,28 +39,28 @@ import org.apache.spark.util.Utils * * The update algorithm uses the "mini-batch" KMeans rule, * generalized to incorporate forgetfullness (i.e. decay). - * The basic update rule (for each cluster) is: + * The update rule (for each cluster) is: * - * c_t+1 = [(c_t * n_t) + (x_t * m_t)] / [n_t + m_t] - * n_t+t = n_t + m_t + * c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t] + * n_t+t = n_t * a + m_t * * Where c_t is the previously estimated centroid for that cluster, * n_t is the number of points assigned to it thus far, x_t is the centroid * estimated on the current batch, and m_t is the number of points assigned * to that centroid in the current batch. * - * This update rule is modified with a decay factor 'a' that scales - * the contribution of the clusters as estimated thus far. - * If a=1, all batches are weighted equally. If a=0, new centroids + * The decay factor 'a' scales the contribution of the clusters as estimated thus far, + * by applying a as a discount weighting on the current point when evaluating + * new incoming data. If a=1, all batches are weighted equally. If a=0, new centroids * are determined entirely by recent data. Lower values correspond to * more forgetting. * - * Decay can optionally be specified as a decay fraction 'q', - * which corresponds to the fraction of batches (or points) - * after which the past will be reduced to a contribution of 0.5. - * This decay fraction can be specified in units of 'points' or 'batches'. - * if 'batches', behavior will be independent of the number of points per batch; - * if 'points', the expected number of points per batch must be specified. + * Decay can optionally be specified by a half life and associated + * time unit. The time unit can either be a batch of data or a single + * data point. Considering data arrived at time t, the half life h is defined + * such that at time t + h the discount applied to the data from t is 0.5. + * The definition remains the same whether the time unit is given + * as batches or points. * */ @DeveloperApi @@ -69,7 +69,7 @@ class StreamingKMeansModel( val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { /** Perform a k-means update on a batch of data. */ - def update(data: RDD[Vector], a: Double, units: String): StreamingKMeansModel = { + def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { val centers = clusterCenters val counts = clusterCounts @@ -94,12 +94,12 @@ class StreamingKMeansModel( val newCount = count val newCentroid = mean / newCount.toDouble // compute the normalized scale factor that controls forgetting - val decayFactor = units match { - case "batches" => newCount / (a * oldCount + newCount) - case "points" => newCount / (math.pow(a, newCount) * oldCount + newCount) + val lambda = timeUnit match { + case "batches" => newCount / (decayFactor * oldCount + newCount) + case "points" => newCount / (math.pow(decayFactor, newCount) * oldCount + newCount) } // perform the update - val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * decayFactor + val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * lambda // store the new counts and centers counts(label) = oldCount + newCount centers(label) = Vectors.fromBreeze(updatedCentroid) @@ -134,8 +134,8 @@ class StreamingKMeansModel( @DeveloperApi class StreamingKMeans( var k: Int, - var a: Double, - var units: String) extends Logging { + var decayFactor: Double, + var timeUnit: String) extends Logging { protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) @@ -149,30 +149,18 @@ class StreamingKMeans( /** Set the decay factor directly (for forgetful algorithms). */ def setDecayFactor(a: Double): this.type = { - this.a = a + this.decayFactor = decayFactor this } - /** Set the decay units for forgetful algorithms ("batches" or "points"). */ - def setUnits(units: String): this.type = { - if (units != "batches" && units != "points") { - throw new IllegalArgumentException("Invalid units for decay: " + units) + /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ + def setHalfLife(halfLife: Double, timeUnit: String): this.type = { + if (timeUnit != "batches" && timeUnit != "points") { + throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) } - this.units = units - this - } - - /** Set decay fraction in units of batches. */ - def setDecayFractionBatches(q: Double): this.type = { - this.a = math.log(1 - q) / math.log(0.5) - this.units = "batches" - this - } - - /** Set decay fraction in units of points. Must specify expected number of points per batch. */ - def setDecayFractionPoints(q: Double, m: Double): this.type = { - this.a = math.pow(math.log(1 - q) / math.log(0.5), 1/m) - this.units = "points" + this.decayFactor = math.exp(math.log(0.5) / halfLife) + logInfo("Setting decay factor to: %g ".format (this.decayFactor)) + this.timeUnit = timeUnit this } @@ -216,7 +204,7 @@ class StreamingKMeans( def trainOn(data: DStream[Vector]) { this.assertInitialized() data.foreachRDD { (rdd, time) => - model = model.update(rdd, this.a, this.units) + model = model.update(rdd, this.decayFactor, this.timeUnit) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala index 5c23b04961df2..de79c7026a696 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.mllib.clustering -import scala.collection.mutable.ArrayBuffer import scala.util.Random import org.scalatest.FunSuite @@ -98,94 +97,6 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { } - test("drifting with fractional decay in units of batches") { - - val numBatches1 = 50 - val numBatches2 = 50 - val numPoints = 1 - val q = 0.25 - val k = 1 - val d = 1 - val r = 2.0 - - // create model with two clusters - val model = new StreamingKMeans() - .setK(1) - .setDecayFractionBatches(q) - .setInitialCenters(Array(Vectors.dense(0.0))) - - // create two batches of data with different, pre-specified centers - // to simulate a transition from one cluster to another - val (input1, centers1) = StreamingKMeansDataGenerator( - numPoints, numBatches1, k, d, r, 42, initCenters = Array(Vectors.dense(100.0))) - val (input2, centers2) = StreamingKMeansDataGenerator( - numPoints, numBatches2, k, d, r, 84, initCenters = Array(Vectors.dense(0.0))) - - // store the history - val history = new ArrayBuffer[Double](numBatches1 + numBatches2) - - // setup and run the model training - val ssc = setupStreams(input1 ++ input2, (inputDStream: DStream[Vector]) => { - model.trainOn(inputDStream) - // extract the center (in this case one-dimensional) - inputDStream.foreachRDD(x => history.append(model.latestModel().clusterCenters(0)(0))) - inputDStream.count() - }) - runStreams(ssc, numBatches1 + numBatches2, numBatches1 + numBatches2) - - // check that the fraction of batches required to reach 50 - // equals the setting of q, by finding the index of the first batch - // below 50 and comparing to total number of batches received - val halvedIndex = history.zipWithIndex.filter( x => x._1 < 50)(0)._2.toDouble - val fraction = (halvedIndex - numBatches1.toDouble) / halvedIndex - assert(fraction ~== q absTol 1E-1) - - } - - test("drifting with fractional decay in units of points") { - - val numBatches1 = 50 - val numBatches2 = 50 - val numPoints = 10 - val q = 0.25 - val k = 1 - val d = 1 - val r = 2.0 - - // create model with two clusters - val model = new StreamingKMeans() - .setK(1) - .setDecayFractionPoints(q, numPoints) - .setInitialCenters(Array(Vectors.dense(0.0))) - - // create two batches of data with different, pre-specified centers - // to simulate a transition from one cluster to another - val (input1, centers1) = StreamingKMeansDataGenerator( - numPoints, numBatches1, k, d, r, 42, initCenters = Array(Vectors.dense(100.0))) - val (input2, centers2) = StreamingKMeansDataGenerator( - numPoints, numBatches2, k, d, r, 84, initCenters = Array(Vectors.dense(0.0))) - - // store the history - val history = new ArrayBuffer[Double](numBatches1 + numBatches2) - - // setup and run the model training - val ssc = setupStreams(input1 ++ input2, (inputDStream: DStream[Vector]) => { - model.trainOn(inputDStream) - // extract the center (in this case one-dimensional) - inputDStream.foreachRDD(x => history.append(model.latestModel().clusterCenters(0)(0))) - inputDStream.count() - }) - runStreams(ssc, numBatches1 + numBatches2, numBatches1 + numBatches2) - - // check that the fraction of batches required to reach 50 - // equals the setting of q, by finding the index of the first batch - // below 50 and comparing to total number of batches received - val halvedIndex = history.zipWithIndex.filter( x => x._1 < 50)(0)._2.toDouble - val fraction = (halvedIndex - numBatches1.toDouble) / halvedIndex - assert(fraction ~== q absTol 1E-1) - - } - def StreamingKMeansDataGenerator( numPoints: Int, numBatches: Int, From 2e682c0cf87a6831dade11f96ddd8a7c76eade3b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 31 Oct 2014 12:38:06 -0700 Subject: [PATCH 20/21] take discount on previous weights; use BLAS; detect dying clusters --- .../examples/mllib/StreamingKMeans.scala | 5 +- .../mllib/clustering/StreamingKMeans.scala | 158 ++++++++++-------- .../clustering/StreamingKMeansSuite.scala | 76 ++++++--- 3 files changed, 148 insertions(+), 91 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala index 79416ae734c52..33e5760aed997 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeans.scala @@ -50,7 +50,6 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingKMeans { def main(args: Array[String]) { - if (args.length != 5) { System.err.println( "Usage: StreamingKMeans " + @@ -67,14 +66,12 @@ object StreamingKMeans { val model = new StreamingKMeans() .setK(args(3).toInt) .setDecayFactor(1.0) - .setRandomCenters(args(4).toInt) + .setRandomCenters(args(4).toInt, 0.0) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() - } - } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 3a6451118ca5e..5919c3d30a277 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -19,16 +19,15 @@ package org.apache.spark.mllib.clustering import scala.reflect.ClassTag -import breeze.linalg.{Vector => BV} - -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.Logging -import org.apache.spark.mllib.linalg.{Vectors, Vector} -import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils +import org.apache.spark.util.random.XORShiftRandom /** * :: DeveloperApi :: @@ -66,55 +65,81 @@ import org.apache.spark.util.Utils @DeveloperApi class StreamingKMeansModel( override val clusterCenters: Array[Vector], - val clusterCounts: Array[Long]) extends KMeansModel(clusterCenters) with Logging { + val clusterWeights: Array[Double]) extends KMeansModel(clusterCenters) with Logging { /** Perform a k-means update on a batch of data. */ def update(data: RDD[Vector], decayFactor: Double, timeUnit: String): StreamingKMeansModel = { - val centers = clusterCenters - val counts = clusterCounts - // find nearest cluster to each point - val closest = data.map(point => (this.predict(point), (point.toBreeze, 1.toLong))) + val closest = data.map(point => (this.predict(point), (point, 1L))) // get sums and counts for updating each cluster - type WeightedPoint = (BV[Double], Long) - def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = { - (p1._1 += p2._1, p1._2 + p2._2) + val mergeContribs: ((Vector, Long), (Vector, Long)) => (Vector, Long) = (p1, p2) => { + BLAS.axpy(1.0, p2._1, p1._1) + (p1._1, p1._2 + p2._2) } - val pointStats: Array[(Int, (BV[Double], Long))] = - closest.reduceByKey(mergeContribs).collect() + val dim = clusterCenters(0).size + val pointStats: Array[(Int, (Vector, Long))] = closest + .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs) + .collect() + + val discount = timeUnit match { + case StreamingKMeans.BATCHES => decayFactor + case StreamingKMeans.POINTS => + val numNewPoints = pointStats.view.map { case (_, (_, n)) => + n + }.sum + math.pow(decayFactor, numNewPoints) + } + + // apply discount to weights + BLAS.scal(discount, Vectors.dense(clusterWeights)) // implement update rule - pointStats.foreach { case (label, (mean, count)) => - // store old count and centroid - val oldCount = counts(label) - val oldCentroid = centers(label).toBreeze - // get new count and centroid - val newCount = count - val newCentroid = mean / newCount.toDouble - // compute the normalized scale factor that controls forgetting - val lambda = timeUnit match { - case "batches" => newCount / (decayFactor * oldCount + newCount) - case "points" => newCount / (math.pow(decayFactor, newCount) * oldCount + newCount) - } - // perform the update - val updatedCentroid = oldCentroid + (newCentroid - oldCentroid) * lambda - // store the new counts and centers - counts(label) = oldCount + newCount - centers(label) = Vectors.fromBreeze(updatedCentroid) + pointStats.foreach { case (label, (sum, count)) => + val centroid = clusterCenters(label) + + val updatedWeight = clusterWeights(label) + count + val lambda = count / math.max(updatedWeight, 1e-16) + + clusterWeights(label) = updatedWeight + BLAS.scal(1.0 - lambda, centroid) + BLAS.axpy(lambda / count, sum, centroid) // display the updated cluster centers - val display = centers(label).size match { - case x if x > 100 => centers(label).toArray.take(100).mkString("[", ",", "...") - case _ => centers(label).toArray.mkString("[", ",", "]") + val display = clusterCenters(label).size match { + case x if x > 100 => centroid.toArray.take(100).mkString("[", ",", "...") + case _ => centroid.toArray.mkString("[", ",", "]") + } + + logInfo(s"Cluster $label updated with weight $updatedWeight and centroid: $display") + } + + // Check whether the smallest cluster is dying. If so, split the largest cluster. + val weightsWithIndex = clusterWeights.view.zipWithIndex + val (maxWeight, largest) = weightsWithIndex.maxBy(_._1) + val (minWeight, smallest) = weightsWithIndex.minBy(_._1) + if (minWeight < 1e-8 * maxWeight) { + logInfo(s"Cluster $smallest is dying. Split the largest cluster $largest into two.") + val weight = (maxWeight + minWeight) / 2.0 + clusterWeights(largest) = weight + clusterWeights(smallest) = weight + val largestClusterCenter = clusterCenters(largest) + val smallestClusterCenter = clusterCenters(smallest) + var j = 0 + while (j < dim) { + val x = largestClusterCenter(j) + val p = 1e-14 * math.max(math.abs(x), 1.0) + largestClusterCenter.toBreeze(j) = x + p + smallestClusterCenter.toBreeze(j) = x - p + j += 1 } - logInfo("Cluster %d updated: %s ".format (label, display)) } - new StreamingKMeansModel(centers, counts) - } + this + } } + /** * :: DeveloperApi :: * StreamingKMeans provides methods for configuring a @@ -128,7 +153,7 @@ class StreamingKMeansModel( * val model = new StreamingKMeans() * .setDecayFactor(0.5) * .setK(3) - * .setRandomCenters(5) + * .setRandomCenters(5, 100.0) * .trainOn(DStream) */ @DeveloperApi @@ -137,9 +162,9 @@ class StreamingKMeans( var decayFactor: Double, var timeUnit: String) extends Logging { - protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) + def this() = this(2, 1.0, StreamingKMeans.BATCHES) - def this() = this(2, 1.0, "batches") + protected var model: StreamingKMeansModel = new StreamingKMeansModel(null, null) /** Set the number of clusters. */ def setK(k: Int): this.type = { @@ -155,7 +180,7 @@ class StreamingKMeans( /** Set the half life and time unit ("batches" or "points") for forgetful algorithms. */ def setHalfLife(halfLife: Double, timeUnit: String): this.type = { - if (timeUnit != "batches" && timeUnit != "points") { + if (timeUnit != StreamingKMeans.BATCHES && timeUnit != StreamingKMeans.POINTS) { throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) } this.decayFactor = math.exp(math.log(0.5) / halfLife) @@ -165,26 +190,23 @@ class StreamingKMeans( } /** Specify initial centers directly. */ - def setInitialCenters(initialCenters: Array[Vector]): this.type = { - val clusterCounts = new Array[Long](this.k) - this.model = new StreamingKMeansModel(initialCenters, clusterCounts) + def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = { + model = new StreamingKMeansModel(centers, weights) this } - /** Initialize random centers, requiring only the number of dimensions. - * - * @param dim Number of dimensions - * @param seed Random seed - * */ - def setRandomCenters(dim: Int, seed: Long = Utils.random.nextLong): this.type = { - - val random = Utils.random - random.setSeed(seed) - - val initialCenters = (0 until k) - .map(_ => Vectors.dense(Array.fill(dim)(random.nextGaussian()))).toArray - val clusterCounts = new Array[Long](this.k) - this.model = new StreamingKMeansModel(initialCenters, clusterCounts) + /** + * Initialize random centers, requiring only the number of dimensions. + * + * @param dim Number of dimensions + * @param weight Weight for each center + * @param seed Random seed + */ + def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = { + val random = new XORShiftRandom(seed) + val centers = Array.fill(k)(Vectors.dense(Array.fill(dim)(random.nextGaussian()))) + val weights = Array.fill(k)(weight) + model = new StreamingKMeansModel(centers, weights) this } @@ -202,9 +224,9 @@ class StreamingKMeans( * @param data DStream containing vector data */ def trainOn(data: DStream[Vector]) { - this.assertInitialized() + assertInitialized() data.foreachRDD { (rdd, time) => - model = model.update(rdd, this.decayFactor, this.timeUnit) + model = model.update(rdd, decayFactor, timeUnit) } } @@ -215,7 +237,7 @@ class StreamingKMeans( * @return DStream containing predictions */ def predictOn(data: DStream[Vector]): DStream[Int] = { - this.assertInitialized() + assertInitialized() data.map(model.predict) } @@ -227,16 +249,20 @@ class StreamingKMeans( * @return DStream containing the input keys and the predictions as values */ def predictOnValues[K: ClassTag](data: DStream[(K, Vector)]): DStream[(K, Int)] = { - this.assertInitialized() + assertInitialized() data.mapValues(model.predict) } /** Check whether cluster centers have been initialized. */ - def assertInitialized(): Unit = { - if (Option(model.clusterCenters) == None) { + private[this] def assertInitialized(): Unit = { + if (model.clusterCenters == null) { throw new IllegalStateException( "Initial cluster centers must be set before starting predictions") } } +} +private[clustering] object StreamingKMeans { + final val BATCHES = "batches" + final val POINTS = "points" } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala index de79c7026a696..850c9fce507cd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -17,21 +17,19 @@ package org.apache.spark.mllib.clustering -import scala.util.Random - import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{Vectors, Vector} +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.TestingUtils._ -import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.TestSuiteBase +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.util.random.XORShiftRandom class StreamingKMeansSuite extends FunSuite with TestSuiteBase { override def maxWaitTimeMillis = 30000 test("accuracy for single center and equivalence to grand average") { - // set parameters val numBatches = 10 val numPoints = 50 @@ -43,9 +41,9 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { val model = new StreamingKMeans() .setK(1) .setDecayFactor(1.0) - .setInitialCenters(Array(Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0))) + .setInitialCenters(Array(Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0)), Array(0.0)) - // generate random data for kmeans + // generate random data for k-means val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42) // setup and run the model training @@ -60,13 +58,12 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { // estimated center from streaming should exactly match the arithmetic mean of all data points // because the decay factor is set to 1.0 - val grandMean = input.flatten.map(x => x.toBreeze).reduce(_+_) / (numBatches * numPoints).toDouble + val grandMean = + input.flatten.map(x => x.toBreeze).reduce(_+_) / (numBatches * numPoints).toDouble assert(model.latestModel().clusterCenters(0) ~== Vectors.dense(grandMean.toArray) absTol 1E-5) - } test("accuracy for two centers") { - val numBatches = 10 val numPoints = 5 val k = 2 @@ -74,27 +71,66 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { val r = 0.1 // create model with two clusters - val model = new StreamingKMeans() + val kMeans = new StreamingKMeans() .setK(2) - .setDecayFactor(1.0) - .setInitialCenters(Array(Vectors.dense(-0.1, 0.1, -0.2, -0.3, -0.1), - Vectors.dense(0.1, -0.2, 0.0, 0.2, 0.1))) + .setHalfLife(2, "batches") + .setInitialCenters( + Array(Vectors.dense(-0.1, 0.1, -0.2, -0.3, -0.1), + Vectors.dense(0.1, -0.2, 0.0, 0.2, 0.1)), + Array(5.0, 5.0)) - // generate random data for kmeans + // generate random data for k-means val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42) // setup and run the model training val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => { - model.trainOn(inputDStream) + kMeans.trainOn(inputDStream) inputDStream.count() }) runStreams(ssc, numBatches, numBatches) // check that estimated centers are close to true centers // NOTE exact assignment depends on the initialization! - assert(centers(0) ~== model.latestModel().clusterCenters(0) absTol 1E-1) - assert(centers(1) ~== model.latestModel().clusterCenters(1) absTol 1E-1) + assert(centers(0) ~== kMeans.latestModel().clusterCenters(0) absTol 1E-1) + assert(centers(1) ~== kMeans.latestModel().clusterCenters(1) absTol 1E-1) + } + + test("detecting dying clusters") { + val numBatches = 10 + val numPoints = 5 + val k = 1 + val d = 1 + val r = 1.0 + // create model with two clusters + val kMeans = new StreamingKMeans() + .setK(2) + .setHalfLife(0.5, "points") + .setInitialCenters( + Array(Vectors.dense(0.0), Vectors.dense(1000.0)), + Array(1.0, 1.0)) + + // new data are all around the first cluster 0.0 + val (input, _) = + StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42, Array(Vectors.dense(0.0))) + + // setup and run the model training + val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => { + kMeans.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + + // check that estimated centers are close to true centers + // NOTE exact assignment depends on the initialization! + val model = kMeans.latestModel() + val c0 = model.clusterCenters(0)(0) + val c1 = model.clusterCenters(1)(0) + + assert(c0 * c1 < 0.0, "should have one positive center and one negative center") + // 0.8 is the mean of half-normal distribution + assert(math.abs(c0) ~== 0.8 absTol 0.6) + assert(math.abs(c1) ~== 0.8 absTol 0.6) } def StreamingKMeansDataGenerator( @@ -105,7 +141,7 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { r: Double, seed: Int, initCenters: Array[Vector] = null): (IndexedSeq[IndexedSeq[Vector]], Array[Vector]) = { - val rand = new Random(seed) + val rand = new XORShiftRandom(seed) val centers = initCenters match { case null => Array.fill(k)(Vectors.dense(Array.fill(d)(rand.nextGaussian()))) case _ => initCenters @@ -118,6 +154,4 @@ class StreamingKMeansSuite extends FunSuite with TestSuiteBase { } (data, centers) } - - } From b2e5b4a167e0e5835f3518d2b68e4063c3f9c955 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 31 Oct 2014 20:31:23 -0700 Subject: [PATCH 21/21] Fixes to docs / examples --- docs/mllib-clustering.md | 2 +- .../org/apache/spark/mllib/clustering/StreamingKMeans.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index f9bb0d9989de7..c696ae9c8e8c8 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -220,7 +220,7 @@ val numClusters = 2 val model = new StreamingKMeans() .setK(numClusters) .setDecayFactor(1.0) - .setRandomWeights(numDimensions) + .setRandomCenters(numDimensions, 0.0) {% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 5919c3d30a277..6189dce9b27da 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -32,9 +32,9 @@ import org.apache.spark.util.random.XORShiftRandom /** * :: DeveloperApi :: * StreamingKMeansModel extends MLlib's KMeansModel for streaming - * algorithms, so it can keep track of the number of points assigned - * to each cluster, and also update the model by doing a single iteration - * of the standard k-means algorithm. + * algorithms, so it can keep track of a continuously updated weight + * associated with each cluster, and also update the model by + * doing a single iteration of the standard k-means algorithm. * * The update algorithm uses the "mini-batch" KMeans rule, * generalized to incorporate forgetfullness (i.e. decay).