From 0898add2e1dd2f1faac9e8d08c758994af03ee6e Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 10:39:31 -0400 Subject: [PATCH 01/30] Added dependency on streaming --- mllib/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/pom.xml b/mllib/pom.xml index b622f96dd7901..b8a3f755f53a4 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -37,6 +37,11 @@ spark-core_${scala.binary.version} ${project.version} + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + org.eclipse.jetty jetty-server From d99aa85d8f275ca605aacb2804f0c55fff10ff2b Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 10:40:55 -0400 Subject: [PATCH 02/30] Helper methods for streaming MLlib apps --- .../spark/mllib/util/MLStreamingUtils.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala new file mode 100644 index 0000000000000..4099f1a510edf --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala @@ -0,0 +1,25 @@ +package org.apache.spark.mllib.util + +import org.apache.spark.annotation.Experimental +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} + +/** + * Helper methods to load streaming data for MLLib applications. + */ +@Experimental +object MLStreamingUtils { + + /** + * Loads streaming labeled points from a stream of text files + * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. + * + * @param ssc Streaming context + * @param path Directory path in any Hadoop-supported file system URI + * @return Labeled points stored as a DStream[LabeledPoint] + */ + def loadLabeledPointsFromText(ssc: StreamingContext, path: String): DStream[LabeledPoint] = + ssc.textFileStream(path).map(LabeledPointParser.parse) + +} From 604f4d738357adccc0168f8449614e8e09d9f70e Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 10:41:25 -0400 Subject: [PATCH 03/30] Expanded private class to include mllib --- .../org/apache/spark/mllib/regression/LinearRegression.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala index 0ebad4eb58d88..ca7e15c4714bd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala @@ -49,7 +49,7 @@ class LinearRegressionModel private[mllib] ( * its corresponding right hand side label y. * See also the documentation for the precise formulation. */ -class LinearRegressionWithSGD private ( +class LinearRegressionWithSGD private[mllib] ( private var stepSize: Double, private var numIterations: Int, private var miniBatchFraction: Double) @@ -68,7 +68,7 @@ class LinearRegressionWithSGD private ( */ def this() = this(1.0, 100, 1.0) - override protected def createModel(weights: Vector, intercept: Double) = { + override protected[mllib] def createModel(weights: Vector, intercept: Double) = { new LinearRegressionModel(weights, intercept) } } From c4b1143dc2ab39506aeefb2f7a89485196308d08 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 10:43:16 -0400 Subject: [PATCH 04/30] Streaming linear regression - Abstract class to support a variety of streaming regression analyses - Example concrete class for streaming linear regression - Example usage: continually train on one data stream and test on another --- .../mllib/StreamingLinearRegression.scala | 55 +++++++++ .../StreamingLinearRegression.scala | 104 ++++++++++++++++++ .../regression/StreamingRegression.scala | 82 ++++++++++++++ 3 files changed, 241 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala new file mode 100644 index 0000000000000..3b212e9755d96 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.SparkConf +import org.apache.spark.mllib.util.MLStreamingUtils +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD +import org.apache.spark.streaming.{Seconds, StreamingContext} + +/** + * Continually update a model on one stream of data using streaming linear regression, + * while making predictions on another stream of data + * + */ +object StreamingLinearRegression { + + def main(args: Array[String]) { + + if (args.length != 4) { + System.err.println("Usage: StreamingLinearRegression ") + System.exit(1) + } + + val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression") + val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) + + val trainingData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(0)) + val testData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(1)) + + val model = StreamingLinearRegressionWithSGD.start(args(3).toInt) + + model.trainOn(trainingData) + model.predictOn(testData).print() + + ssc.start() + ssc.awaitTermination() + + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala new file mode 100644 index 0000000000000..065653fb38d38 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.annotation.Experimental + +/** + * Train or predict a linear regression model on streaming data. Training uses + * Stochastic Gradient Descent to update the model based on each new batch of + * incoming data from a DStream (see LinearRegressionWithSGD for model equation) + * + * Each batch of data is assumed to be an RDD of LabeledPoints. + * The number of data points per batch can vary, but the number + * of features must be constant. + */ +@Experimental +class StreamingLinearRegressionWithSGD private ( + private var stepSize: Double, + private var numIterations: Int, + private var miniBatchFraction: Double, + private var numFeatures: Int) + extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable { + + val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + + var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0) + +} + +/** + * Top-level methods for calling StreamingLinearRegressionWithSGD. + */ +@Experimental +object StreamingLinearRegressionWithSGD { + + /** + * Start a streaming Linear Regression model by setting optimization parameters. + * + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param numFeatures Number of features per record, must be constant for all batches of data. + */ + def start( + stepSize: Double, + numIterations: Int, + miniBatchFraction: Double, + numFeatures: Int): StreamingLinearRegressionWithSGD = { + new StreamingLinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, numFeatures) + } + + /** + * Start a streaming Linear Regression model by setting optimization parameters. + * + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param numFeatures Number of features per record, must be constant for all batches of data. + */ + def start( + numIterations: Int, + stepSize: Double, + numFeatures: Int): StreamingLinearRegressionWithSGD = { + start(stepSize, numIterations, 1.0, numFeatures) + } + + /** + * Start a streaming Linear Regression model by setting optimization parameters. + * + * @param numIterations Number of iterations of gradient descent to run. + * @param numFeatures Number of features per record, must be constant for all batches of data. + */ + def start( + numIterations: Int, + numFeatures: Int): StreamingLinearRegressionWithSGD = { + start(0.1, numIterations, 1.0, numFeatures) + } + + /** + * Start a streaming Linear Regression model by setting optimization parameters. + * + * @param numFeatures Number of features per record, must be constant for all batches of data. + */ + def start( + numFeatures: Int): StreamingLinearRegressionWithSGD = { + start(0.1, 100, 1.0, numFeatures) + } + +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala new file mode 100644 index 0000000000000..b693156480784 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.streaming.dstream.DStream + +/** + * :: DeveloperApi :: + * StreamingRegression implements methods for training + * a linear regression model on streaming data, and using it + * for prediction on streaming data. + * + * This class takes as type parameters a GeneralizedLinearModel, + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct + * streaming versions of arbitrary regression analyses. For example usage, + * see StreamingLinearRegressionWithSGD. + * + */ +@DeveloperApi +@Experimental +abstract class StreamingRegression[M <: GeneralizedLinearModel, A <: GeneralizedLinearAlgorithm[M]] extends Logging { + + /** The model to be updated and used for prediction. */ + var model: M + + /** The algorithm to use for updating. */ + val algorithm: A + + /** Log the latest model parameters and return the model. */ + def latest(): M = { + logInfo("Latest model: weights, %s".format(model.weights.toString)) + logInfo("Latest model: intercept, %s".format(model.intercept.toString)) + model + } + + /** + * Update the model by training on batches of data from a DStream. + * This operation registers a DStream for training the model, + * and updates the model based on every subsequent non-empty + * batch of data from the stream. + * + * @param data DStream containing labeled data + */ + def trainOn(data: DStream[LabeledPoint]) { + data.foreachRDD{ + rdd => + if (rdd.count() > 0) { + model = algorithm.run(rdd, model.weights) + logInfo("Model updated") + } + this.latest() + } + } + + /** + * Use the model to make predictions on batches of data from a DStream + * + * @param data DStream containing labeled data + * @return DStream containing predictions + */ + def predictOn(data: DStream[LabeledPoint]): DStream[Double] = { + data.map(x => model.predict(x.features)) + } + +} \ No newline at end of file From 453974e75afbebfc605e80efaa32e8f45dc0e258 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 15:36:14 -0400 Subject: [PATCH 05/30] Fixed indentation --- .../regression/StreamingLinearRegression.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index 065653fb38d38..7078a03bd9258 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -58,10 +58,10 @@ object StreamingLinearRegressionWithSGD { * @param numFeatures Number of features per record, must be constant for all batches of data. */ def start( - stepSize: Double, - numIterations: Int, - miniBatchFraction: Double, - numFeatures: Int): StreamingLinearRegressionWithSGD = { + stepSize: Double, + numIterations: Int, + miniBatchFraction: Double, + numFeatures: Int): StreamingLinearRegressionWithSGD = { new StreamingLinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, numFeatures) } @@ -73,9 +73,9 @@ object StreamingLinearRegressionWithSGD { * @param numFeatures Number of features per record, must be constant for all batches of data. */ def start( - numIterations: Int, - stepSize: Double, - numFeatures: Int): StreamingLinearRegressionWithSGD = { + numIterations: Int, + stepSize: Double, + numFeatures: Int): StreamingLinearRegressionWithSGD = { start(stepSize, numIterations, 1.0, numFeatures) } @@ -86,8 +86,8 @@ object StreamingLinearRegressionWithSGD { * @param numFeatures Number of features per record, must be constant for all batches of data. */ def start( - numIterations: Int, - numFeatures: Int): StreamingLinearRegressionWithSGD = { + numIterations: Int, + numFeatures: Int): StreamingLinearRegressionWithSGD = { start(0.1, numIterations, 1.0, numFeatures) } From fd31e036afe537b86d49487527eca83ac62c7630 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 10 Jul 2014 15:49:32 -0400 Subject: [PATCH 06/30] Changed logging behavior --- .../spark/mllib/regression/StreamingRegression.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala index b693156480784..dbf5457fbcd01 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -43,10 +43,8 @@ abstract class StreamingRegression[M <: GeneralizedLinearModel, A <: Generalized /** The algorithm to use for updating. */ val algorithm: A - /** Log the latest model parameters and return the model. */ + /** Return the latest model. */ def latest(): M = { - logInfo("Latest model: weights, %s".format(model.weights.toString)) - logInfo("Latest model: intercept, %s".format(model.intercept.toString)) model } @@ -65,7 +63,8 @@ abstract class StreamingRegression[M <: GeneralizedLinearModel, A <: Generalized model = algorithm.run(rdd, model.weights) logInfo("Model updated") } - this.latest() + logInfo("Current model: weights, %s".format(model.weights.toString)) + logInfo("Current model: intercept, %s".format(model.intercept.toString)) } } From fb4683a98f8ef04f18f7ea1e8341f8334120e6c6 Mon Sep 17 00:00:00 2001 From: freeman Date: Mon, 14 Jul 2014 12:44:27 -0400 Subject: [PATCH 07/30] Minor changes for scalastyle consistency --- .../mllib/StreamingLinearRegression.scala | 3 ++- .../regression/StreamingLinearRegression.scala | 2 +- .../mllib/regression/StreamingRegression.scala | 6 ++++-- .../spark/mllib/util/MLStreamingUtils.scala | 17 +++++++++++++++++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala index 3b212e9755d96..b4ec2c36e728d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -32,7 +32,8 @@ object StreamingLinearRegression { def main(args: Array[String]) { if (args.length != 4) { - System.err.println("Usage: StreamingLinearRegression ") + System.err.println( + "Usage: StreamingLinearRegression ") System.exit(1) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index 7078a03bd9258..225c9e5b00f24 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -101,4 +101,4 @@ object StreamingLinearRegressionWithSGD { start(0.1, 100, 1.0, numFeatures) } -} \ No newline at end of file +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala index dbf5457fbcd01..3ccfe1e3caf14 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -35,7 +35,9 @@ import org.apache.spark.streaming.dstream.DStream */ @DeveloperApi @Experimental -abstract class StreamingRegression[M <: GeneralizedLinearModel, A <: GeneralizedLinearAlgorithm[M]] extends Logging { +abstract class StreamingRegression[ + M <: GeneralizedLinearModel, + A <: GeneralizedLinearAlgorithm[M]] extends Logging { /** The model to be updated and used for prediction. */ var model: M @@ -78,4 +80,4 @@ abstract class StreamingRegression[M <: GeneralizedLinearModel, A <: Generalized data.map(x => model.predict(x.features)) } -} \ No newline at end of file +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala index 4099f1a510edf..6bc7b5250b307 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.mllib.util import org.apache.spark.annotation.Experimental From 86220bca21f1fc7a517ff193e1b1fafba7fe4ddc Mon Sep 17 00:00:00 2001 From: freeman Date: Mon, 14 Jul 2014 14:10:20 -0400 Subject: [PATCH 08/30] Streaming linear regression unit tests - Test parameter estimate accuracy after several updates - Test parameter accuracy improvement after each batch --- .../StreamingLinearRegressionSuite.scala | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala new file mode 100644 index 0000000000000..a9804dea68055 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import java.io.File + +import com.google.common.io.Files +import org.apache.commons.io.FileUtils +import org.scalatest.FunSuite +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} + +import scala.collection.mutable.ArrayBuffer + +class StreamingLinearRegressionSuite extends FunSuite { + + // Assert that two values are equal within tolerance epsilon + def assertEqual(v1: Double, v2: Double, epsilon: Double) { + def errorMessage = v1.toString + " did not equal " + v2.toString + assert(math.abs(v1-v2) <= epsilon, errorMessage) + } + + // Assert that model predictions are correct + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data + test("streaming linear regression parameter accuracy") { + + val conf = new SparkConf().setMaster("local").setAppName("streaming test") + val testDir = Files.createTempDir() + val numBatches = 10 + val ssc = new StreamingContext(conf, Seconds(1)) + val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) + val model = StreamingLinearRegressionWithSGD.start(numFeatures=2, numIterations=50) + + model.trainOn(data) + + ssc.start() + + // write data to a file stream + Thread.sleep(5000) + for (i <- 0 until numBatches) { + val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) + val file = new File(testDir, i.toString) + FileUtils.writeStringToFile(file, samples.map(x => x.toString).mkString("\n")) + Thread.sleep(Milliseconds(1000).milliseconds) + } + Thread.sleep(Milliseconds(5000).milliseconds) + + ssc.stop() + + System.clearProperty("spark.driver.port") + FileUtils.deleteDirectory(testDir) + + // check accuracy of final parameter estimates + assertEqual(model.latest().intercept, 0.0, 0.1) + assertEqual(model.latest().weights(0), 10.0, 0.1) + assertEqual(model.latest().weights(1), 10.0, 0.1) + + // check accuracy of predictions + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) + validatePrediction(validationData.map(row => model.latest().predict(row.features)), validationData) + } + + // Test that parameter estimates improve when learning Y = 10*X1 on streaming data + test("streaming linear regression parameter convergence") { + + val conf = new SparkConf().setMaster("local").setAppName("streaming test") + val testDir = Files.createTempDir() + val ssc = new StreamingContext(conf, Seconds(1)) + val numBatches = 5 + val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) + val model = StreamingLinearRegressionWithSGD.start(numFeatures=1, numIterations=50) + + model.trainOn(data) + + ssc.start() + + // write data to a file stream + val history = new ArrayBuffer[Double](numBatches) + Thread.sleep(5000) + for (i <- 0 until numBatches) { + val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) + val file = new File(testDir, i.toString) + FileUtils.writeStringToFile(file, samples.map(x => x.toString).mkString("\n")) + Thread.sleep(Milliseconds(1000).milliseconds) + history.append(math.abs(model.latest().weights(0) - 10.0)) + } + Thread.sleep(Milliseconds(5000).milliseconds) + + ssc.stop() + + System.clearProperty("spark.driver.port") + FileUtils.deleteDirectory(testDir) + + // check that error is always getting smaller + assert(history.drop(1).zip(history.dropRight(1)).forall(x => (x._1 - x._2) < 0)) + + } + +} From a2a63ad827994c86d517b7227c064420b80117c1 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 17 Jul 2014 20:51:38 -0400 Subject: [PATCH 09/30] Makes convergence test more robust - Slower simulated data rates and updates - Softens requirement for strict error reduction, but still ensures error stability, and error reduction on at least a subset of updates --- .../regression/StreamingLinearRegressionSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index a9804dea68055..9b9d9678df644 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -90,7 +90,7 @@ class StreamingLinearRegressionSuite extends FunSuite { val conf = new SparkConf().setMaster("local").setAppName("streaming test") val testDir = Files.createTempDir() - val ssc = new StreamingContext(conf, Seconds(1)) + val ssc = new StreamingContext(conf, Seconds(2)) val numBatches = 5 val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) val model = StreamingLinearRegressionWithSGD.start(numFeatures=1, numIterations=50) @@ -106,7 +106,7 @@ class StreamingLinearRegressionSuite extends FunSuite { val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) FileUtils.writeStringToFile(file, samples.map(x => x.toString).mkString("\n")) - Thread.sleep(Milliseconds(1000).milliseconds) + Thread.sleep(Milliseconds(6000).milliseconds) history.append(math.abs(model.latest().weights(0) - 10.0)) } Thread.sleep(Milliseconds(5000).milliseconds) @@ -116,8 +116,11 @@ class StreamingLinearRegressionSuite extends FunSuite { System.clearProperty("spark.driver.port") FileUtils.deleteDirectory(testDir) - // check that error is always getting smaller - assert(history.drop(1).zip(history.dropRight(1)).forall(x => (x._1 - x._2) < 0)) + val deltas = history.drop(1).zip(history.dropRight(1)) + // check error stability (it always either shrinks, or increases with small tol) + assert(deltas.forall(x => (x._1 - x._2) <= 0.1)) + // check that error shrunk on at least 2 batches + assert(deltas.map(x => if ((x._1 - x._2) < 0) 1 else 0).sum > 1) } From 6bfe1e6214a67f834138b16585f4729da602d925 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Wed, 30 Jul 2014 20:18:20 -0400 Subject: [PATCH 10/30] Fixed imports --- .../mllib/regression/StreamingLinearRegression.scala | 2 +- .../spark/mllib/regression/StreamingRegression.scala | 2 +- .../org/apache/spark/mllib/util/MLStreamingUtils.scala | 2 +- .../regression/StreamingLinearRegressionSuite.scala | 9 +++++---- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index 225c9e5b00f24..7b16a94af913c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -17,8 +17,8 @@ package org.apache.spark.mllib.regression -import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg.Vectors /** * Train or predict a linear regression model on streaming data. Training uses diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala index 3ccfe1e3caf14..901db687c56b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -17,8 +17,8 @@ package org.apache.spark.mllib.regression -import org.apache.spark.Logging import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.Logging import org.apache.spark.streaming.dstream.DStream /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala index 6bc7b5250b307..b90adca8b24b5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala @@ -18,9 +18,9 @@ package org.apache.spark.mllib.util import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext -import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} /** * Helper methods to load streaming data for MLLib applications. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 9b9d9678df644..8c1a8722761b1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -19,14 +19,15 @@ package org.apache.spark.mllib.regression import java.io.File -import com.google.common.io.Files +import scala.collection.mutable.ArrayBuffer + import org.apache.commons.io.FileUtils +import com.google.common.io.Files import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} -import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} - -import scala.collection.mutable.ArrayBuffer class StreamingLinearRegressionSuite extends FunSuite { From 50dd2376a2abf40001ba7ddbdabf18dd599b1da1 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Wed, 30 Jul 2014 20:19:37 -0400 Subject: [PATCH 11/30] Removed experimental tag --- .../apache/spark/mllib/regression/StreamingRegression.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala index 901db687c56b7..9f65608572c9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.regression -import org.apache.spark.annotation.{Experimental, DeveloperApi} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.Logging import org.apache.spark.streaming.dstream.DStream @@ -34,7 +34,6 @@ import org.apache.spark.streaming.dstream.DStream * */ @DeveloperApi -@Experimental abstract class StreamingRegression[ M <: GeneralizedLinearModel, A <: GeneralizedLinearAlgorithm[M]] extends Logging { From 74188d6b2c9c0075ce11ca57fe3ab6f8667391aa Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Wed, 30 Jul 2014 23:19:45 -0400 Subject: [PATCH 12/30] Eliminate dependency on commons --- .../regression/StreamingLinearRegressionSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 8c1a8722761b1..e943c0f7899a9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -18,16 +18,17 @@ package org.apache.spark.mllib.regression import java.io.File +import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer -import org.apache.commons.io.FileUtils import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} +import org.apache.spark.util.Utils class StreamingLinearRegressionSuite extends FunSuite { @@ -66,7 +67,7 @@ class StreamingLinearRegressionSuite extends FunSuite { for (i <- 0 until numBatches) { val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, samples.map(x => x.toString).mkString("\n")) + Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) Thread.sleep(Milliseconds(1000).milliseconds) } Thread.sleep(Milliseconds(5000).milliseconds) @@ -74,7 +75,7 @@ class StreamingLinearRegressionSuite extends FunSuite { ssc.stop() System.clearProperty("spark.driver.port") - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) // check accuracy of final parameter estimates assertEqual(model.latest().intercept, 0.0, 0.1) @@ -106,7 +107,7 @@ class StreamingLinearRegressionSuite extends FunSuite { for (i <- 0 until numBatches) { val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, samples.map(x => x.toString).mkString("\n")) + Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) Thread.sleep(Milliseconds(6000).milliseconds) history.append(math.abs(model.latest().weights(0) - 10.0)) } @@ -115,7 +116,7 @@ class StreamingLinearRegressionSuite extends FunSuite { ssc.stop() System.clearProperty("spark.driver.port") - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) val deltas = history.drop(1).zip(history.dropRight(1)) // check error stability (it always either shrinks, or increases with small tol) From 4b0a5d320d0045710d2dddf9e72843ceaa6383de Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Thu, 31 Jul 2014 18:22:29 -0400 Subject: [PATCH 13/30] Cleaned up tests - Use LocalSparkContext from mllib.util - Clarified timing parameters and removed unnecessary delays --- .../StreamingLinearRegressionSuite.scala | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index e943c0f7899a9..6775e9a7db28d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -26,11 +26,10 @@ import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} -import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils -class StreamingLinearRegressionSuite extends FunSuite { +class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // Assert that two values are equal within tolerance epsilon def assertEqual(v1: Double, v2: Double, epsilon: Double) { @@ -51,10 +50,10 @@ class StreamingLinearRegressionSuite extends FunSuite { // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data test("streaming linear regression parameter accuracy") { - val conf = new SparkConf().setMaster("local").setAppName("streaming test") val testDir = Files.createTempDir() val numBatches = 10 - val ssc = new StreamingContext(conf, Seconds(1)) + val batchDuration = Milliseconds(1000) + val ssc = new StreamingContext(sc, batchDuration) val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) val model = StreamingLinearRegressionWithSGD.start(numFeatures=2, numIterations=50) @@ -63,16 +62,14 @@ class StreamingLinearRegressionSuite extends FunSuite { ssc.start() // write data to a file stream - Thread.sleep(5000) for (i <- 0 until numBatches) { val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(Milliseconds(1000).milliseconds) + Thread.sleep(batchDuration.milliseconds) } - Thread.sleep(Milliseconds(5000).milliseconds) - ssc.stop() + ssc.stop(stopSparkContext=false) System.clearProperty("spark.driver.port") Utils.deleteRecursively(testDir) @@ -90,9 +87,9 @@ class StreamingLinearRegressionSuite extends FunSuite { // Test that parameter estimates improve when learning Y = 10*X1 on streaming data test("streaming linear regression parameter convergence") { - val conf = new SparkConf().setMaster("local").setAppName("streaming test") val testDir = Files.createTempDir() - val ssc = new StreamingContext(conf, Seconds(2)) + val batchDuration = Milliseconds(2000) + val ssc = new StreamingContext(sc, batchDuration) val numBatches = 5 val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) val model = StreamingLinearRegressionWithSGD.start(numFeatures=1, numIterations=50) @@ -103,17 +100,17 @@ class StreamingLinearRegressionSuite extends FunSuite { // write data to a file stream val history = new ArrayBuffer[Double](numBatches) - Thread.sleep(5000) for (i <- 0 until numBatches) { val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(Milliseconds(6000).milliseconds) + Thread.sleep(batchDuration.milliseconds) + // wait an extra few seconds to make sure the update finishes before new data arrive + Thread.sleep(4000) history.append(math.abs(model.latest().weights(0) - 10.0)) } - Thread.sleep(Milliseconds(5000).milliseconds) - ssc.stop() + ssc.stop(stopSparkContext=false) System.clearProperty("spark.driver.port") Utils.deleteRecursively(testDir) From c7d38a3a8713b3e626ef8b8d31f904e47715271c Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Thu, 31 Jul 2014 20:27:07 -0400 Subject: [PATCH 14/30] Move check for empty data to GradientDescent --- .../mllib/optimization/GradientDescent.scala | 84 +++++++++++-------- .../regression/StreamingRegression.scala | 6 +- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 7030eeabe400a..39b362ad1b2d2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -159,45 +159,57 @@ object GradientDescent extends Logging { val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val numExamples = data.count() - val miniBatchSize = numExamples * miniBatchFraction - - // Initialize weights as a column vector - var weights = Vectors.dense(initialWeights.toArray) - - /** - * For the first iteration, the regVal will be initialized as sum of weight squares - * if it's L2 updater; for L1 updater, the same logic is followed. - */ - var regVal = updater.compute( - weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 - - for (i <- 1 to numIterations) { - // Sample a subset (fraction miniBatchFraction) of the total data - // compute and sum up the subgradients on this subset (this is one map-reduce) - val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) - .aggregate((BDV.zeros[Double](weights.size), 0.0))( - seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => - val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) - (grad, loss + l) - }, - combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => - (grad1 += grad2, loss1 + loss2) - }) + + // if no data, return initial weights to avoid NaNs + if (numExamples == 0) { + + logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found") + (initialWeights, stochasticLossHistory.toArray) + + } else { + + val miniBatchSize = numExamples * miniBatchFraction + + // Initialize weights as a column vector + var weights = Vectors.dense(initialWeights.toArray) /** - * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration - * and regVal is the regularization value computed in the previous iteration as well. + * For the first iteration, the regVal will be initialized as sum of weight squares + * if it's L2 updater; for L1 updater, the same logic is followed. */ - stochasticLossHistory.append(lossSum / miniBatchSize + regVal) - val update = updater.compute( - weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) - weights = update._1 - regVal = update._2 + var regVal = updater.compute( + weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 + + for (i <- 1 to numIterations) { + // Sample a subset (fraction miniBatchFraction) of the total data + // compute and sum up the subgradients on this subset (this is one map-reduce) + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) + .aggregate((BDV.zeros[Double](weights.size), 0.0))( + seqOp = (c, v) => (c, v) match { + case ((grad, loss), (label, features)) => + val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) + (grad, loss + l) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((grad1, loss1), (grad2, loss2)) => + (grad1 += grad2, loss1 + loss2) + }) + + /** + * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration + * and regVal is the regularization value computed in the previous iteration as well. + */ + stochasticLossHistory.append(lossSum / miniBatchSize + regVal) + val update = updater.compute( + weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) + weights = update._1 + regVal = update._2 + } + + logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + + (weights, stochasticLossHistory.toArray) } - - logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( - stochasticLossHistory.takeRight(10).mkString(", "))) - - (weights, stochasticLossHistory.toArray) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala index 9f65608572c9c..151e6b98f2429 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala @@ -60,10 +60,8 @@ abstract class StreamingRegression[ def trainOn(data: DStream[LabeledPoint]) { data.foreachRDD{ rdd => - if (rdd.count() > 0) { - model = algorithm.run(rdd, model.weights) - logInfo("Model updated") - } + model = algorithm.run(rdd, model.weights) + logInfo("Model updated") logInfo("Current model: weights, %s".format(model.weights.toString)) logInfo("Current model: intercept, %s".format(model.intercept.toString)) } From 14b801e7c3b2c957b27fea423e7b3feb833fd0b9 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Thu, 31 Jul 2014 20:37:23 -0400 Subject: [PATCH 15/30] Name changes --- ...ression.scala => StreamingLinearAlgorithm.scala} | 13 ++++++------- .../regression/StreamingLinearRegression.scala | 2 +- .../regression/StreamingLinearRegressionSuite.scala | 10 +++++----- 3 files changed, 12 insertions(+), 13 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/regression/{StreamingRegression.scala => StreamingLinearAlgorithm.scala} (89%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala similarity index 89% rename from mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala rename to mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 151e6b98f2429..11dacb19fd51c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -23,9 +23,9 @@ import org.apache.spark.streaming.dstream.DStream /** * :: DeveloperApi :: - * StreamingRegression implements methods for training - * a linear regression model on streaming data, and using it - * for prediction on streaming data. + * StreamingLinearAlgorithm implements methods for continuously + * training a generalized linear model model on streaming data, + * and using it for prediction on streaming data. * * This class takes as type parameters a GeneralizedLinearModel, * and a GeneralizedLinearAlgorithm, making it easy to extend to construct @@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream.DStream * */ @DeveloperApi -abstract class StreamingRegression[ +abstract class StreamingLinearAlgorithm[ M <: GeneralizedLinearModel, A <: GeneralizedLinearAlgorithm[M]] extends Logging { @@ -45,7 +45,7 @@ abstract class StreamingRegression[ val algorithm: A /** Return the latest model. */ - def latest(): M = { + def latestModel(): M = { model } @@ -58,8 +58,7 @@ abstract class StreamingRegression[ * @param data DStream containing labeled data */ def trainOn(data: DStream[LabeledPoint]) { - data.foreachRDD{ - rdd => + data.foreachRDD { rdd => model = algorithm.run(rdd, model.weights) logInfo("Model updated") logInfo("Current model: weights, %s".format(model.weights.toString)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index 7b16a94af913c..dc0ffc67f3dcc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -35,7 +35,7 @@ class StreamingLinearRegressionWithSGD private ( private var numIterations: Int, private var miniBatchFraction: Double, private var numFeatures: Int) - extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable { + extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD] with Serializable { val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 6775e9a7db28d..6a2200a548c58 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -75,13 +75,13 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { Utils.deleteRecursively(testDir) // check accuracy of final parameter estimates - assertEqual(model.latest().intercept, 0.0, 0.1) - assertEqual(model.latest().weights(0), 10.0, 0.1) - assertEqual(model.latest().weights(1), 10.0, 0.1) + assertEqual(model.latestModel().intercept, 0.0, 0.1) + assertEqual(model.latestModel().weights(0), 10.0, 0.1) + assertEqual(model.latestModel().weights(1), 10.0, 0.1) // check accuracy of predictions val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) - validatePrediction(validationData.map(row => model.latest().predict(row.features)), validationData) + validatePrediction(validationData.map(row => model.latestModel().predict(row.features)), validationData) } // Test that parameter estimates improve when learning Y = 10*X1 on streaming data @@ -107,7 +107,7 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { Thread.sleep(batchDuration.milliseconds) // wait an extra few seconds to make sure the update finishes before new data arrive Thread.sleep(4000) - history.append(math.abs(model.latest().weights(0) - 10.0)) + history.append(math.abs(model.latestModel().weights(0) - 10.0)) } ssc.stop(stopSparkContext=false) From 00aafdc55e27be42937b4c86c6e7e0ce77030297 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Thu, 31 Jul 2014 21:09:19 -0400 Subject: [PATCH 16/30] Add modifiers --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 11dacb19fd51c..e449742e0c2d6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -39,10 +39,10 @@ abstract class StreamingLinearAlgorithm[ A <: GeneralizedLinearAlgorithm[M]] extends Logging { /** The model to be updated and used for prediction. */ - var model: M + protected var model: M /** The algorithm to use for updating. */ - val algorithm: A + protected val algorithm: A /** Return the latest model. */ def latestModel(): M = { From c3f8b5ab5655e020bceeded81d96fd3152f6edce Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 03:03:20 -0400 Subject: [PATCH 17/30] Modified logging --- .../mllib/regression/StreamingLinearAlgorithm.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index e449742e0c2d6..80235a8ece3a0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -32,6 +32,10 @@ import org.apache.spark.streaming.dstream.DStream * streaming versions of arbitrary regression analyses. For example usage, * see StreamingLinearRegressionWithSGD. * + * NOTE: Only weights will be updated, not an intercept. + * If the model needs an intercept, it should be manually appended + * to the input data. + * */ @DeveloperApi abstract class StreamingLinearAlgorithm[ @@ -52,17 +56,16 @@ abstract class StreamingLinearAlgorithm[ /** * Update the model by training on batches of data from a DStream. * This operation registers a DStream for training the model, - * and updates the model based on every subsequent non-empty + * and updates the model based on every subsequent * batch of data from the stream. * * @param data DStream containing labeled data */ def trainOn(data: DStream[LabeledPoint]) { - data.foreachRDD { rdd => + data.foreachRDD { (rdd, time) => model = algorithm.run(rdd, model.weights) - logInfo("Model updated") - logInfo("Current model: weights, %s".format(model.weights.toString)) - logInfo("Current model: intercept, %s".format(model.intercept.toString)) + logInfo("Model updated at time %s".format(time.toString)) + logInfo("Current model: weights, %s".format(model.weights.toArray.take(100).mkString("[", ",", "]"))) } } From b9b69f64e5dc8e8b2683ef1b94c9ba9172594a71 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 03:04:00 -0400 Subject: [PATCH 18/30] Added setter methods --- .../mllib/StreamingLinearRegression.scala | 15 +++-- .../StreamingLinearRegression.scala | 64 ++++++++++++++----- .../StreamingLinearRegressionSuite.scala | 18 ++++-- 3 files changed, 70 insertions(+), 27 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala index b4ec2c36e728d..e3e0d003f0019 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -17,14 +17,16 @@ package org.apache.spark.examples.mllib -import org.apache.spark.SparkConf -import org.apache.spark.mllib.util.MLStreamingUtils +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Continually update a model on one stream of data using streaming linear regression, - * while making predictions on another stream of data + * while making predictions on another stream of data. Assumes data arrive in the form + * of text files saved to a directory. * */ object StreamingLinearRegression { @@ -40,10 +42,11 @@ object StreamingLinearRegression { val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression") val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) - val trainingData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(0)) - val testData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(1)) + val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0)) + val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1)) - val model = StreamingLinearRegressionWithSGD.start(args(3).toInt) + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0))) model.trainOn(trainingData) model.predictOn(testData).print() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index dc0ffc67f3dcc..491cbd64205d8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.regression import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Train or predict a linear regression model on streaming data. Training uses @@ -27,19 +27,51 @@ import org.apache.spark.mllib.linalg.Vectors * * Each batch of data is assumed to be an RDD of LabeledPoints. * The number of data points per batch can vary, but the number - * of features must be constant. + * of features must be constant. An initial weight + * vector must be provided. + * */ @Experimental class StreamingLinearRegressionWithSGD private ( private var stepSize: Double, private var numIterations: Int, private var miniBatchFraction: Double, - private var numFeatures: Int) + private var initialWeights: Vector) extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD] with Serializable { + /** + * Construct a StreamingLinearRegression object with default parameters: + * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}. + */ + def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0)) + val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction) - var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0) + var model = algorithm.createModel(initialWeights, 0.0) + + /** Set the step size for gradient descent. Default: 0.1. */ + def setStepSize(stepSize: Double): this.type = { + this.algorithm.optimizer.setStepSize(stepSize) + this + } + + /** Set the number of iterations of gradient descent to run per update. Default: 50. */ + def setNumIterations(numIterations: Int): this.type = { + this.algorithm.optimizer.setNumIterations(numIterations) + this + } + + /** Set the fraction of each batch to use for updates. Default: 1.0. */ + def setMiniBatchFraction(miniBatchFraction: Double): this.type = { + this.algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) + this + } + + /** Set the initial weights. Default: [0.0, 0.0]. */ + def setInitialWeights(initialWeights: Vector): this.type = { + this.model = algorithm.createModel(initialWeights, 0.0) + this + } } @@ -55,14 +87,14 @@ object StreamingLinearRegressionWithSGD { * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. * @param miniBatchFraction Fraction of data to be used per iteration. - * @param numFeatures Number of features per record, must be constant for all batches of data. + * @param initialWeights Weights to initialize model with. */ def start( stepSize: Double, numIterations: Int, miniBatchFraction: Double, - numFeatures: Int): StreamingLinearRegressionWithSGD = { - new StreamingLinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, numFeatures) + initialWeights: Vector): StreamingLinearRegressionWithSGD = { + new StreamingLinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, initialWeights) } /** @@ -70,35 +102,35 @@ object StreamingLinearRegressionWithSGD { * * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. - * @param numFeatures Number of features per record, must be constant for all batches of data. + * @param initialWeights Weights to initialize model with. */ def start( numIterations: Int, stepSize: Double, - numFeatures: Int): StreamingLinearRegressionWithSGD = { - start(stepSize, numIterations, 1.0, numFeatures) + initialWeights: Vector): StreamingLinearRegressionWithSGD = { + start(stepSize, numIterations, 1.0, initialWeights) } /** * Start a streaming Linear Regression model by setting optimization parameters. * * @param numIterations Number of iterations of gradient descent to run. - * @param numFeatures Number of features per record, must be constant for all batches of data. + * @param initialWeights Weights to initialize model with. */ def start( numIterations: Int, - numFeatures: Int): StreamingLinearRegressionWithSGD = { - start(0.1, numIterations, 1.0, numFeatures) + initialWeights: Vector): StreamingLinearRegressionWithSGD = { + start(0.1, numIterations, 1.0, initialWeights) } /** * Start a streaming Linear Regression model by setting optimization parameters. * - * @param numFeatures Number of features per record, must be constant for all batches of data. + * @param initialWeights Weights to initialize model with. */ def start( - numFeatures: Int): StreamingLinearRegressionWithSGD = { - start(0.1, 100, 1.0, numFeatures) + initialWeights: Vector): StreamingLinearRegressionWithSGD = { + start(0.1, 100, 1.0, initialWeights) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 6a2200a548c58..ad8c1f07bc38d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.scalatest.FunSuite -import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext} +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils @@ -54,8 +55,12 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { val numBatches = 10 val batchDuration = Milliseconds(1000) val ssc = new StreamingContext(sc, batchDuration) - val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) - val model = StreamingLinearRegressionWithSGD.start(numFeatures=2, numIterations=50) + val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString) + //val model = StreamingLinearRegressionWithSGD.start(initialWeights=Vectors.dense(0.0), numIterations=50) + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(0.0, 0.0)) + .setStepSize(0.1) + .setNumIterations(50) model.trainOn(data) @@ -91,8 +96,11 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { val batchDuration = Milliseconds(2000) val ssc = new StreamingContext(sc, batchDuration) val numBatches = 5 - val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString) - val model = StreamingLinearRegressionWithSGD.start(numFeatures=1, numIterations=50) + val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString) + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(0.0)) + .setStepSize(0.1) + .setNumIterations(50) model.trainOn(data) From 7d513789e40db2e12e693af9248255979287dd30 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 03:04:20 -0400 Subject: [PATCH 19/30] Moved streaming loader to MLUtils --- .../spark/mllib/util/MLStreamingUtils.scala | 42 ------------------- .../org/apache/spark/mllib/util/MLUtils.scala | 13 ++++++ 2 files changed, 13 insertions(+), 42 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala deleted file mode 100644 index b90adca8b24b5..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.util - -import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.StreamingContext - -/** - * Helper methods to load streaming data for MLLib applications. - */ -@Experimental -object MLStreamingUtils { - - /** - * Loads streaming labeled points from a stream of text files - * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. - * - * @param ssc Streaming context - * @param path Directory path in any Hadoop-supported file system URI - * @return Labeled points stored as a DStream[LabeledPoint] - */ - def loadLabeledPointsFromText(ssc: StreamingContext, path: String): DStream[LabeledPoint] = - ssc.textFileStream(path).map(LabeledPointParser.parse) - -} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index aaf92a1a8869a..58074b71e08b9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -30,6 +30,8 @@ import org.apache.spark.util.random.BernoulliSampler import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream /** * Helper methods to load, save and pre-process data used in ML Lib. @@ -212,6 +214,17 @@ object MLUtils { def loadLabeledPoints(sc: SparkContext, dir: String): RDD[LabeledPoint] = loadLabeledPoints(sc, dir, sc.defaultMinPartitions) + /** + * Loads streaming labeled points from a stream of text files + * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. + * + * @param ssc Streaming context + * @param dir Directory path in any Hadoop-supported file system URI + * @return Labeled points stored as a DStream[LabeledPoint] + */ + def loadStreamingLabeledPoints(ssc: StreamingContext, dir: String): DStream[LabeledPoint] = + ssc.textFileStream(dir).map(LabeledPointParser.parse) + /** * Load labeled data from a file. The data format used here is * , ... From 2fe0720db3c946b94b91f8b15e099f90a332a219 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 03:15:41 -0400 Subject: [PATCH 20/30] Minor cleanup --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 2 +- .../mllib/regression/StreamingLinearRegression.scala | 8 ++++---- .../mllib/regression/StreamingLinearRegressionSuite.scala | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 80235a8ece3a0..5ee1d85e05c48 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.dstream.DStream * * This class takes as type parameters a GeneralizedLinearModel, * and a GeneralizedLinearAlgorithm, making it easy to extend to construct - * streaming versions of arbitrary regression analyses. For example usage, + * streaming versions of any analyses using GLMs. For example usage, * see StreamingLinearRegressionWithSGD. * * NOTE: Only weights will be updated, not an intercept. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index 491cbd64205d8..affe27ab60809 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -84,8 +84,8 @@ object StreamingLinearRegressionWithSGD { /** * Start a streaming Linear Regression model by setting optimization parameters. * - * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. + * @param numIterations Number of iterations of gradient descent to run. * @param miniBatchFraction Fraction of data to be used per iteration. * @param initialWeights Weights to initialize model with. */ @@ -100,13 +100,13 @@ object StreamingLinearRegressionWithSGD { /** * Start a streaming Linear Regression model by setting optimization parameters. * - * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. + * @param numIterations Number of iterations of gradient descent to run. * @param initialWeights Weights to initialize model with. */ def start( - numIterations: Int, stepSize: Double, + numIterations: Int, initialWeights: Vector): StreamingLinearRegressionWithSGD = { start(stepSize, numIterations, 1.0, initialWeights) } @@ -130,7 +130,7 @@ object StreamingLinearRegressionWithSGD { */ def start( initialWeights: Vector): StreamingLinearRegressionWithSGD = { - start(0.1, 100, 1.0, initialWeights) + start(0.1, 50, 1.0, initialWeights) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index ad8c1f07bc38d..fddd2c47d03fa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -56,7 +56,6 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { val batchDuration = Milliseconds(1000) val ssc = new StreamingContext(sc, batchDuration) val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString) - //val model = StreamingLinearRegressionWithSGD.start(initialWeights=Vectors.dense(0.0), numIterations=50) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.1) From 66eba5eb543618b73eb8707daaec95c79bfff9f9 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 11:17:41 -0400 Subject: [PATCH 21/30] Fixed line lengths --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 3 ++- .../spark/mllib/regression/StreamingLinearRegression.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 5ee1d85e05c48..16bcdc4ca2138 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -65,7 +65,8 @@ abstract class StreamingLinearAlgorithm[ data.foreachRDD { (rdd, time) => model = algorithm.run(rdd, model.weights) logInfo("Model updated at time %s".format(time.toString)) - logInfo("Current model: weights, %s".format(model.weights.toArray.take(100).mkString("[", ",", "]"))) + logInfo("Current model: weights, %s".format( + model.weights.toArray.take(100).mkString("[", ",", "]"))) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala index affe27ab60809..4cc3fab70522e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala @@ -37,7 +37,8 @@ class StreamingLinearRegressionWithSGD private ( private var numIterations: Int, private var miniBatchFraction: Double, private var initialWeights: Vector) - extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD] with Serializable { + extends StreamingLinearAlgorithm[ + LinearRegressionModel, LinearRegressionWithSGD] with Serializable { /** * Construct a StreamingLinearRegression object with default parameters: From c3326e79bcddf82353c2e2f3e825cd123280aa90 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 14:57:02 -0400 Subject: [PATCH 22/30] Improved documentation --- .../mllib/StreamingLinearRegression.scala | 22 +++++++++++++++---- .../org/apache/spark/mllib/util/MLUtils.scala | 2 ++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala index e3e0d003f0019..1fd37edfa7427 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -24,9 +24,23 @@ import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** - * Continually update a model on one stream of data using streaming linear regression, - * while making predictions on another stream of data. Assumes data arrive in the form - * of text files saved to a directory. + * Train a linear regression model on one stream of data and make predictions + * on another stream, where the data streams arrive as text files + * into two different directories. + * + * The rows of the text files must be labeled data points in the form + * `(y,[x1,x2,x3,...,xn])` + * Where n is the number of features. n must be the same for train and test. + * + * Usage: StreamingLinearRegression + * + * To run on your local machine using the two directories `trainingDir` and `testDir`, + * with updates every 5 seconds, and 2 features per data point, call: + * $ bin/run-example \ + * org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2 + * + * As you add text files to `trainingDir` the model will continuously update. + * Anytime you add text files to `testDir`, you'll see predictions from the current model. * */ object StreamingLinearRegression { @@ -35,7 +49,7 @@ object StreamingLinearRegression { if (args.length != 4) { System.err.println( - "Usage: StreamingLinearRegression ") + "Usage: StreamingLinearRegression ") System.exit(1) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 45ff42160364d..f4cce86a65ba7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -197,6 +197,8 @@ object MLUtils { /** * Loads streaming labeled points from a stream of text files * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`. + * See `StreamingContext.textFileStream` for more details on how to + * generate a stream from files * * @param ssc Streaming context * @param dir Directory path in any Hadoop-supported file system URI From d28cf9a350726ec57c5e214dbc10faebd0b04c1e Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 18:40:29 -0400 Subject: [PATCH 23/30] Added usage notes --- .../regression/StreamingLinearAlgorithm.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 16bcdc4ca2138..a1df18867550c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -25,17 +25,28 @@ import org.apache.spark.streaming.dstream.DStream * :: DeveloperApi :: * StreamingLinearAlgorithm implements methods for continuously * training a generalized linear model model on streaming data, - * and using it for prediction on streaming data. + * and using it for prediction on (possibly different) streaming data. * * This class takes as type parameters a GeneralizedLinearModel, * and a GeneralizedLinearAlgorithm, making it easy to extend to construct - * streaming versions of any analyses using GLMs. For example usage, - * see StreamingLinearRegressionWithSGD. - * - * NOTE: Only weights will be updated, not an intercept. - * If the model needs an intercept, it should be manually appended + * streaming versions of any analyses using GLMs. Only weights will be updated, + * not an intercept. If the model needs an intercept, it should be manually appended * to the input data. * + * For example usage, see `StreamingLinearRegressionWithSGD`. + * + * NOTE(Freeman): In some use cases, the order in which trainOn and predictOn + * are called in an application will affect the results. When called on + * the same DStream, if trainOn is called before predictOn, when new data + * arrive the model will update and the prediction will be based on the new + * model. Whereas if predictOn is called first, the prediction will use the model + * from the previous update. + * + * NOTE(Freeman): It is ok to call predictOn repeatedly on multiple streams; this + * will generate predictions for each one all using the current model. + * It is also ok to call trainOn on different streams; this will update + * the model using each of the different sources, in sequence. + * */ @DeveloperApi abstract class StreamingLinearAlgorithm[ From 74cf44094228f4e864247292374ced30262da80c Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 18:45:45 -0400 Subject: [PATCH 24/30] Removed static methods - Also deleted companion object - Renamed file for consistency - Explained usage in documentation --- ...=> StreamingLinearRegressionWithSGD.scala} | 73 +++---------------- 1 file changed, 11 insertions(+), 62 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/regression/{StreamingLinearRegression.scala => StreamingLinearRegressionWithSGD.scala} (57%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala similarity index 57% rename from mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala rename to mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 4cc3fab70522e..f034b20242b16 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -23,16 +23,25 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors} /** * Train or predict a linear regression model on streaming data. Training uses * Stochastic Gradient Descent to update the model based on each new batch of - * incoming data from a DStream (see LinearRegressionWithSGD for model equation) + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation) * * Each batch of data is assumed to be an RDD of LabeledPoints. * The number of data points per batch can vary, but the number * of features must be constant. An initial weight * vector must be provided. * + * Use a builder pattern to construct a streaming linear regression + * analysis in an application, like: + * + * model = new StreamingLinearRegressionWithSGD() + * .setStepSize(0.5) + * .setNumIterations(10) + * .setInitialWeights(Vectors.dense(...)) + * .trainOn(DStream) + * */ @Experimental -class StreamingLinearRegressionWithSGD private ( +class StreamingLinearRegressionWithSGD ( private var stepSize: Double, private var numIterations: Int, private var miniBatchFraction: Double, @@ -75,63 +84,3 @@ class StreamingLinearRegressionWithSGD private ( } } - -/** - * Top-level methods for calling StreamingLinearRegressionWithSGD. - */ -@Experimental -object StreamingLinearRegressionWithSGD { - - /** - * Start a streaming Linear Regression model by setting optimization parameters. - * - * @param stepSize Step size to be used for each iteration of gradient descent. - * @param numIterations Number of iterations of gradient descent to run. - * @param miniBatchFraction Fraction of data to be used per iteration. - * @param initialWeights Weights to initialize model with. - */ - def start( - stepSize: Double, - numIterations: Int, - miniBatchFraction: Double, - initialWeights: Vector): StreamingLinearRegressionWithSGD = { - new StreamingLinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction, initialWeights) - } - - /** - * Start a streaming Linear Regression model by setting optimization parameters. - * - * @param stepSize Step size to be used for each iteration of gradient descent. - * @param numIterations Number of iterations of gradient descent to run. - * @param initialWeights Weights to initialize model with. - */ - def start( - stepSize: Double, - numIterations: Int, - initialWeights: Vector): StreamingLinearRegressionWithSGD = { - start(stepSize, numIterations, 1.0, initialWeights) - } - - /** - * Start a streaming Linear Regression model by setting optimization parameters. - * - * @param numIterations Number of iterations of gradient descent to run. - * @param initialWeights Weights to initialize model with. - */ - def start( - numIterations: Int, - initialWeights: Vector): StreamingLinearRegressionWithSGD = { - start(0.1, numIterations, 1.0, initialWeights) - } - - /** - * Start a streaming Linear Regression model by setting optimization parameters. - * - * @param initialWeights Weights to initialize model with. - */ - def start( - initialWeights: Vector): StreamingLinearRegressionWithSGD = { - start(0.1, 50, 1.0, initialWeights) - } - -} From 777b5964650263f2e7567b5d6853858e51727d96 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 19:15:29 -0400 Subject: [PATCH 25/30] Restored treeAggregate --- .../org/apache/spark/mllib/optimization/GradientDescent.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index dad9dbbf75587..a74012153556f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -173,6 +173,7 @@ object GradientDescent extends Logging { // Initialize weights as a column vector var weights = Vectors.dense(initialWeights.toArray) val n = weights.size + .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( /** * For the first iteration, the regVal will be initialized as sum of weight squares From 8711c4108fb357e3e2e7fe3c5f706bdb0a69165b Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 19:27:39 -0400 Subject: [PATCH 26/30] Used return to avoid indentation --- .../mllib/optimization/GradientDescent.scala | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index a74012153556f..08cf4bea98de4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -166,52 +166,51 @@ object GradientDescent extends Logging { if (numExamples == 0) { logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found") - (initialWeights, stochasticLossHistory.toArray) + return (initialWeights, stochasticLossHistory.toArray) - } else { + } - // Initialize weights as a column vector - var weights = Vectors.dense(initialWeights.toArray) - val n = weights.size + // Initialize weights as a column vector + var weights = Vectors.dense(initialWeights.toArray) + val n = weights.size + + /** + * For the first iteration, the regVal will be initialized as sum of weight squares + * if it's L2 updater; for L1 updater, the same logic is followed. + */ + var regVal = updater.compute( + weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 + + for (i <- 1 to numIterations) { + // Sample a subset (fraction miniBatchFraction) of the total data + // compute and sum up the subgradients on this subset (this is one map-reduce) + val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( + seqOp = (c, v) => (c, v) match { + case ((grad, loss), (label, features)) => + val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) + (grad, loss + l) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((grad1, loss1), (grad2, loss2)) => + (grad1 += grad2, loss1 + loss2) + }) /** - * For the first iteration, the regVal will be initialized as sum of weight squares - * if it's L2 updater; for L1 updater, the same logic is followed. + * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration + * and regVal is the regularization value computed in the previous iteration as well. */ - var regVal = updater.compute( - weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 - - for (i <- 1 to numIterations) { - // Sample a subset (fraction miniBatchFraction) of the total data - // compute and sum up the subgradients on this subset (this is one map-reduce) - val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) - .aggregate((BDV.zeros[Double](weights.size), 0.0))( - seqOp = (c, v) => (c, v) match { - case ((grad, loss), (label, features)) => - val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) - (grad, loss + l) - }, - combOp = (c1, c2) => (c1, c2) match { - case ((grad1, loss1), (grad2, loss2)) => - (grad1 += grad2, loss1 + loss2) - }) - - /** - * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration - * and regVal is the regularization value computed in the previous iteration as well. - */ - stochasticLossHistory.append(lossSum / miniBatchSize + regVal) - val update = updater.compute( - weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) - weights = update._1 - regVal = update._2 - } - - logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( - stochasticLossHistory.takeRight(10).mkString(", "))) - - (weights, stochasticLossHistory.toArray) + stochasticLossHistory.append(lossSum / miniBatchSize + regVal) + val update = updater.compute( + weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam) + weights = update._1 + regVal = update._2 } + + logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + + (weights, stochasticLossHistory.toArray) + } } From 29f27ecd68996c52d48b2416386d1d50cfca236a Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 19:30:34 -0400 Subject: [PATCH 27/30] Formatting --- .../mllib/regression/StreamingLinearRegressionWithSGD.scala | 2 +- .../mllib/regression/StreamingLinearRegressionSuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index f034b20242b16..0c06215f4aac5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -33,7 +33,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors} * Use a builder pattern to construct a streaming linear regression * analysis in an application, like: * - * model = new StreamingLinearRegressionWithSGD() + * val model = new StreamingLinearRegressionWithSGD() * .setStepSize(0.5) * .setNumIterations(10) * .setInitialWeights(Vectors.dense(...)) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index fddd2c47d03fa..ed21f84472c9a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -67,7 +67,8 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // write data to a file stream for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) + val samples = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) val file = new File(testDir, i.toString) Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) Thread.sleep(batchDuration.milliseconds) @@ -85,7 +86,8 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { // check accuracy of predictions val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) - validatePrediction(validationData.map(row => model.latestModel().predict(row.features)), validationData) + validatePrediction(validationData.map(row => model.latestModel().predict(row.features)), + validationData) } // Test that parameter estimates improve when learning Y = 10*X1 on streaming data From 8b95b270341089045b02f5e1e7cbf904bd4c724a Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 19:47:38 -0400 Subject: [PATCH 28/30] Restored broadcasting --- .../mllib/optimization/GradientDescent.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 08cf4bea98de4..a6912056395d7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -182,18 +182,17 @@ object GradientDescent extends Logging { weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 for (i <- 1 to numIterations) { + val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) - .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( - seqOp = (c, v) => (c, v) match { - case ((grad, loss), (label, features)) => - val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) - (grad, loss + l) + .treeAggregate((BDV.zeros[Double](n), 0.0))( + seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) => + val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad)) + (grad, loss + l) }, - combOp = (c1, c2) => (c1, c2) match { - case ((grad1, loss1), (grad2, loss2)) => - (grad1 += grad2, loss1 + loss2) + combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => + (grad1 += grad2, loss1 + loss2) }) /** From 4086fee0ffc4d4d8c4f028ee1f38933ad1f42c9a Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 21:32:06 -0400 Subject: [PATCH 29/30] Fixed current weight formatting --- .../spark/mllib/regression/StreamingLinearAlgorithm.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index a1df18867550c..41cdfac9cc4e3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -78,6 +78,11 @@ abstract class StreamingLinearAlgorithm[ logInfo("Model updated at time %s".format(time.toString)) logInfo("Current model: weights, %s".format( model.weights.toArray.take(100).mkString("[", ",", "]"))) + val display = model.weights.size match { + case x if x > 100 => model.weights.toArray.take(100).mkString("[", ",", "...") + case _ => model.weights.toArray.mkString("[", ",", "]") + } + logInfo("Current model: weights, %s".format (display)) } } From 775ea29e53a7067ff6e143b455b49ddbb8553d94 Mon Sep 17 00:00:00 2001 From: Jeremy Freeman Date: Fri, 1 Aug 2014 21:50:24 -0400 Subject: [PATCH 30/30] Throw error if user doesn't initialize weights --- .../regression/StreamingLinearAlgorithm.scala | 17 ++++++++++++----- .../StreamingLinearRegressionWithSGD.scala | 6 ++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index 41cdfac9cc4e3..b8b0b42611775 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -29,9 +29,10 @@ import org.apache.spark.streaming.dstream.DStream * * This class takes as type parameters a GeneralizedLinearModel, * and a GeneralizedLinearAlgorithm, making it easy to extend to construct - * streaming versions of any analyses using GLMs. Only weights will be updated, - * not an intercept. If the model needs an intercept, it should be manually appended - * to the input data. + * streaming versions of any analyses using GLMs. + * Initial weights must be set before calling trainOn or predictOn. + * Only weights will be updated, not an intercept. If the model needs + * an intercept, it should be manually appended to the input data. * * For example usage, see `StreamingLinearRegressionWithSGD`. * @@ -73,11 +74,13 @@ abstract class StreamingLinearAlgorithm[ * @param data DStream containing labeled data */ def trainOn(data: DStream[LabeledPoint]) { + if (Option(model.weights) == None) { + logError("Initial weights must be set before starting training") + throw new IllegalArgumentException + } data.foreachRDD { (rdd, time) => model = algorithm.run(rdd, model.weights) logInfo("Model updated at time %s".format(time.toString)) - logInfo("Current model: weights, %s".format( - model.weights.toArray.take(100).mkString("[", ",", "]"))) val display = model.weights.size match { case x if x > 100 => model.weights.toArray.take(100).mkString("[", ",", "...") case _ => model.weights.toArray.mkString("[", ",", "]") @@ -93,6 +96,10 @@ abstract class StreamingLinearAlgorithm[ * @return DStream containing predictions */ def predictOn(data: DStream[LabeledPoint]): DStream[Double] = { + if (Option(model.weights) == None) { + logError("Initial weights must be set before starting prediction") + throw new IllegalArgumentException + } data.map(x => model.predict(x.features)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala index 0c06215f4aac5..8851097050318 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala @@ -51,9 +51,11 @@ class StreamingLinearRegressionWithSGD ( /** * Construct a StreamingLinearRegression object with default parameters: - * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}. + * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0}. + * Initial weights must be set before using trainOn or predictOn + * (see `StreamingLinearAlgorithm`) */ - def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0)) + def this() = this(0.1, 50, 1.0, null) val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)