From 9b8dd56663fae556549a61f265996cbd3414f35e Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 3 Mar 2014 18:30:30 +0800 Subject: [PATCH 1/3] add new optimizer for GradientDescent, with local updater --- .../mllib/optimization/GradientDescent.scala | 8 +- .../GradientDescentWithLocalUpdate.scala | 147 ++++++++++++++++++ .../GradientDescentWithLocalUpdateSuite.scala | 71 +++++++++ 3 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 8e87b98bac061..33de9604e0c2b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -32,10 +32,10 @@ import scala.collection.mutable.ArrayBuffer class GradientDescent(var gradient: Gradient, var updater: Updater) extends Optimizer with Logging { - private var stepSize: Double = 1.0 - private var numIterations: Int = 100 - private var regParam: Double = 0.0 - private var miniBatchFraction: Double = 1.0 + protected var stepSize: Double = 1.0 + protected var numIterations: Int = 100 + protected var regParam: Double = 0.0 + protected var miniBatchFraction: Double = 1.0 /** * Set the initial step size of SGD for the first step. Default 1.0. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala new file mode 100644 index 0000000000000..e4e682784ca38 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +import org.jblas.DoubleMatrix + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +/** + * Class used to solve an optimization problem using Gradient Descent. + * @param gradient Gradient function to be used. + * @param updater Updater to be used to update weights after every iteration. + */ +class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater) + extends GradientDescent(gradient, updater) with Logging +{ + private var numLocalIterations: Int = 1 + + /** + * Set the number of local iterations. Default 1. + */ + def setNumLocalIterations(numLocalIter: Int): this.type = { + this.numLocalIterations = numLocalIter + this + } + + override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]) + : Array[Double] = { + + val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD( + data, + gradient, + updater, + stepSize, + numIterations, + numLocalIterations, + regParam, + miniBatchFraction, + initialWeights) + weights + } + +} + +// Top-level method to run gradient descent. +object GradientDescentWithLocalUpdate extends Logging { + /** + * Run BSP+ gradient descent in parallel using mini batches. + * + * @param data - Input data for SGD. RDD of form (label, [feature values]). + * @param gradient - Gradient object that will be used to compute the gradient. + * @param updater - Updater object that will be used to update the model. + * @param stepSize - stepSize to be used during update. + * @param numOuterIterations - number of outer iterations that SGD should be run. + * @param numInnerIterations - number of inner iterations that SGD should be run. + * @param regParam - regularization parameter + * @param miniBatchFraction - fraction of the input data set that should be used for + * one iteration of SGD. Default value 1.0. + * + * @return A tuple containing two elements. The first element is a column matrix containing + * weights for every feature, and the second element is an array containing the stochastic + * loss computed for every iteration. + */ + def runMiniBatchSGD( + data: RDD[(Double, Array[Double])], + gradient: Gradient, + updater: Updater, + stepSize: Double, + numOuterIterations: Int, + numInnerIterations: Int, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { + + val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations) + + val numExamples: Long = data.count() + val numPartition = data.partitions.length + val miniBatchSize = numExamples * miniBatchFraction / numPartition + + // Initialize weights as a column vector + var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*) + var regVal = 0.0 + + for (i <- 1 to numOuterIterations) { + val weightsAndLosses = data.mapPartitions { iter => + var iterReserved = iter + val localLossHistory = new ArrayBuffer[Double](numInnerIterations) + + for (j <- 1 to numInnerIterations) { + val (iterCurrent, iterNext) = iterReserved.duplicate + val rand = new Random(42 + i * numOuterIterations + j) + val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction) + val (gradientSum, lossSum) = sampled.map { case (y, features) => + val featuresCol = new DoubleMatrix(features.length, 1, features: _*) + val (grad, loss) = gradient.compute(featuresCol, y, weights) + (grad, loss) + }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) + + localLossHistory += lossSum / miniBatchSize + regVal + + val update = updater.compute(weights, gradientSum.div(miniBatchSize), + stepSize, (i - 1) + numOuterIterations + j, regParam) + + weights = update._1 + regVal = update._2 + + iterReserved = iterNext + } + + List((weights, localLossHistory.toArray)).iterator + } + + val c = weightsAndLosses.collect() + val (ws, ls) = c.unzip + + stochasticLossHistory.append(ls.head.reduce(_ + _) / ls.head.size) + + val weightsSum = ws.reduce(_ addi _) + weights = weightsSum.divi(c.size) + } + + logInfo("GradientDescentWithLocalUpdate finished. Last 10 stochastic losses %s".format( + stochasticLossHistory.takeRight(10).mkString(", "))) + + (weights.toArray, stochasticLossHistory.toArray) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala new file mode 100644 index 0000000000000..97a0d10400849 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.mllib.regression._ +import org.apache.spark.mllib.util.LocalSparkContext + +class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + + import GradientDescentSuite._ + + test("Assert the loss is decreasing.") { + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val updater = new SimpleUpdater() + val stepSize = 1.0 + val numIterations = 10 + val numLocalIterations = 10 + val regParam = 0 + val miniBatchFrac = 1.0 + + // Add a extra variable consisting of all 1.0's for the intercept. + val testData = generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Array(1.0, features: _*) + } + + val dataRDD = sc.parallelize(data, 2).cache() + val initialWeightsWithIntercept = Array(1.0, initialWeights: _*) + + val (_, loss) = GradientDescentWithLocalUpdate.runMiniBatchSGD( + dataRDD, + gradient, + updater, + stepSize, + numIterations, + numLocalIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(loss.last - loss.head < 0, "loss isn't decreasing.") + + val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + } +} From e16ee908240f1bbde8161dfc2e19c1b267bb6f1b Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 19 Mar 2014 16:05:00 +0800 Subject: [PATCH 2/3] fix code style and comments --- .../mllib/optimization/GradientDescent.scala | 11 ++++---- .../GradientDescentWithLocalUpdate.scala | 25 +++++++++++-------- .../GradientDescentWithLocalUpdateSuite.scala | 2 +- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 844c25d1736fc..fde5a05219d18 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -30,12 +30,11 @@ import scala.collection.mutable.ArrayBuffer * @param updater Updater to be used to update weights after every iteration. */ class GradientDescent(var gradient: Gradient, var updater: Updater) - extends Optimizer with Logging -{ - protected var stepSize: Double = 1.0 - protected var numIterations: Int = 100 - protected var regParam: Double = 0.0 - protected var miniBatchFraction: Double = 1.0 + extends Optimizer with Logging { + protected var stepSize: Double = 1.0 + protected var numIterations: Int = 100 + protected var regParam: Double = 0.0 + protected var miniBatchFraction: Double = 1.0 /** * Set the initial step size of SGD for the first step. Default 1.0. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala index e4e682784ca38..96e211b8f3211 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala @@ -17,13 +17,15 @@ package org.apache.spark.mllib.optimization +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.jblas.DoubleMatrix + import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.jblas.DoubleMatrix -import scala.collection.mutable.ArrayBuffer -import scala.util.Random /** * Class used to solve an optimization problem using Gradient Descent. @@ -31,8 +33,7 @@ import scala.util.Random * @param updater Updater to be used to update weights after every iteration. */ class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater) - extends GradientDescent(gradient, updater) with Logging -{ + extends GradientDescent(gradient, updater) with Logging { private var numLocalIterations: Int = 1 /** @@ -64,7 +65,9 @@ class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater) // Top-level method to run gradient descent. object GradientDescentWithLocalUpdate extends Logging { /** - * Run BSP+ gradient descent in parallel using mini batches. + * Run gradient descent with local update in parallel using mini batches. Unlike the + * [[GradientDescent]], here gradient descent takes place not only among jobs, but also inner + * jobs, i.e. on an executor. * * @param data - Input data for SGD. RDD of form (label, [feature values]). * @param gradient - Gradient object that will be used to compute the gradient. @@ -89,7 +92,7 @@ object GradientDescentWithLocalUpdate extends Logging { numInnerIterations: Int, regParam: Double, miniBatchFraction: Double, - initialWeights: Array[Double]) : (Array[Double], Array[Double]) = { + initialWeights: Array[Double]): (Array[Double], Array[Double]) = { val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations) @@ -112,9 +115,9 @@ object GradientDescentWithLocalUpdate extends Logging { val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction) val (gradientSum, lossSum) = sampled.map { case (y, features) => val featuresCol = new DoubleMatrix(features.length, 1, features: _*) - val (grad, loss) = gradient.compute(featuresCol, y, weights) - (grad, loss) - }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) + gradient.compute(featuresCol, y, weights) + }.reduceOption((a, b) => (a._1.addi(b._1), a._2 + b._2)) + .getOrElse((DoubleMatrix.zeros(0), 0.0)) localLossHistory += lossSum / miniBatchSize + regVal @@ -139,7 +142,7 @@ object GradientDescentWithLocalUpdate extends Logging { weights = weightsSum.divi(c.size) } - logInfo("GradientDescentWithLocalUpdate finished. Last 10 stochastic losses %s".format( + logInfo("GradientDescentWithLocalUpdate finished. Last a few stochastic losses %s".format( stochasticLossHistory.takeRight(10).mkString(", "))) (weights.toArray, stochasticLossHistory.toArray) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala index 97a0d10400849..a8998c0c6f586 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala @@ -23,7 +23,7 @@ import org.scalatest.matchers.ShouldMatchers import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.LocalSparkContext -class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext with ShouldMatchers { +class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext { import GradientDescentSuite._ From 5f8220a343a6ebbeff907745c6b29f64c3610620 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 20 Mar 2014 08:38:56 +0800 Subject: [PATCH 3/3] modify local update --- .../GradientDescentWithLocalUpdate.scala | 77 +++++++++++-------- .../GradientDescentWithLocalUpdateSuite.scala | 13 ++-- 2 files changed, 52 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala index 96e211b8f3211..ae4a27eceb920 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdate.scala @@ -56,6 +56,7 @@ class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater) numLocalIterations, regParam, miniBatchFraction, + 20, initialWeights) weights } @@ -73,8 +74,8 @@ object GradientDescentWithLocalUpdate extends Logging { * @param gradient - Gradient object that will be used to compute the gradient. * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. - * @param numOuterIterations - number of outer iterations that SGD should be run. - * @param numInnerIterations - number of inner iterations that SGD should be run. + * @param numIterations - number of outer iterations that SGD should be run. + * @param numLocalIterations - number of inner iterations that SGD should be run. * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. @@ -88,58 +89,70 @@ object GradientDescentWithLocalUpdate extends Logging { gradient: Gradient, updater: Updater, stepSize: Double, - numOuterIterations: Int, - numInnerIterations: Int, + numIterations: Int, + numLocalIterations: Int, regParam: Double, miniBatchFraction: Double, + tinyBatchSize: Int = 20, initialWeights: Array[Double]): (Array[Double], Array[Double]) = { - val stochasticLossHistory = new ArrayBuffer[Double](numOuterIterations) - - val numExamples: Long = data.count() - val numPartition = data.partitions.length - val miniBatchSize = numExamples * miniBatchFraction / numPartition + val stochasticLossHistory = new ArrayBuffer[Double](numIterations) // Initialize weights as a column vector var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*) - var regVal = 0.0 + val regVal = updater + .compute(weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2 + - for (i <- 1 to numOuterIterations) { + for (i <- 1 to numIterations) { val weightsAndLosses = data.mapPartitions { iter => - var iterReserved = iter - val localLossHistory = new ArrayBuffer[Double](numInnerIterations) + val rand = new Random(42 + i * numIterations) + val sampled = iter.filter(x => rand.nextDouble() <= miniBatchFraction) + + val ((weightsAvg, lossAvg, regValAvg), _) = sampled.grouped(tinyBatchSize).map { + case tinyDataSets => + val dataMatrices = tinyDataSets.map { + case (y, features) => (y, new DoubleMatrix(features.length, 1, features: _*)) + } - for (j <- 1 to numInnerIterations) { - val (iterCurrent, iterNext) = iterReserved.duplicate - val rand = new Random(42 + i * numOuterIterations + j) - val sampled = iterCurrent.filter(x => rand.nextDouble() <= miniBatchFraction) - val (gradientSum, lossSum) = sampled.map { case (y, features) => - val featuresCol = new DoubleMatrix(features.length, 1, features: _*) - gradient.compute(featuresCol, y, weights) - }.reduceOption((a, b) => (a._1.addi(b._1), a._2 + b._2)) - .getOrElse((DoubleMatrix.zeros(0), 0.0)) + Iterator.iterate((weights, 0.0, regVal)) { + case (lastWeights, lastLoss, lastRegVal) => - localLossHistory += lossSum / miniBatchSize + regVal + val (localGrad, localLoss) = dataMatrices.map { case (y, featuresCol) => + gradient.compute(featuresCol, y, lastWeights) + }.reduce((a, b) => (a._1.addi(b._1), a._2+b._2)) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), - stepSize, (i - 1) + numOuterIterations + j, regParam) + val grad = localGrad.divi(dataMatrices.size) + val loss = localLoss / dataMatrices.size - weights = update._1 - regVal = update._2 + val (newWeights, newRegVal) = updater.compute( + lastWeights, grad.div(1.0), stepSize, i*numIterations, regParam) - iterReserved = iterNext + (newWeights, loss+lastLoss, newRegVal+lastRegVal) + + }.drop(numLocalIterations).next() + + }.foldLeft(((DoubleMatrix.zeros(initialWeights.length), 0.0, 0.0), 0.0)) { + case (((lMatrix, lLoss, lRegVal), count), (rMatrix, rLoss, rRegVal)) => + ((lMatrix.muli(count).addi(rMatrix).divi(count+1), + (lLoss*count+rLoss)/(count+1), + (lRegVal*count+rRegVal)/(count+1) + ), + count+1 + ) } - List((weights, localLossHistory.toArray)).iterator + val localLossHistory = (lossAvg + regValAvg) / numLocalIterations + + List((weightsAvg, localLossHistory)).iterator } val c = weightsAndLosses.collect() val (ws, ls) = c.unzip - stochasticLossHistory.append(ls.head.reduce(_ + _) / ls.head.size) + stochasticLossHistory.append(ls.reduce(_ + _) / ls.size) - val weightsSum = ws.reduce(_ addi _) - weights = weightsSum.divi(c.size) + weights = ws.reduce(_ addi _).divi(c.size) } logInfo("GradientDescentWithLocalUpdate finished. Last a few stochastic losses %s".format( diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala index a8998c0c6f586..4e2693648805d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentWithLocalUpdateSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.mllib.optimization import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers import org.apache.spark.mllib.regression._ import org.apache.spark.mllib.util.LocalSparkContext @@ -38,8 +37,8 @@ class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContex val gradient = new LogisticGradient() val updater = new SimpleUpdater() val stepSize = 1.0 - val numIterations = 10 - val numLocalIterations = 10 + val numIterations = 50 + val numLocalIterations = 2 val regParam = 0 val miniBatchFrac = 1.0 @@ -61,11 +60,13 @@ class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContex numLocalIterations, regParam, miniBatchFrac, + 20, initialWeightsWithIntercept) - assert(loss.last - loss.head < 0, "loss isn't decreasing.") + val lossToCompare = loss.drop(10) + assert(lossToCompare.last - lossToCompare.head < 0, "loss isn't decreasing.") - val lossDiff = loss.init.zip(loss.tail).map { case (lhs, rhs) => lhs - rhs } - assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8) + val lossDiff = lossToCompare.init.zip(lossToCompare.tail).map { case (lhs, rhs) => lhs - rhs } + assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8, "losses seem divergence.") } }