Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import scala.collection.mutable.ArrayBuffer
* @param updater Updater to be used to update weights after every iteration.
*/
class GradientDescent(var gradient: Gradient, var updater: Updater)
extends Optimizer with Logging
{
private var stepSize: Double = 1.0
private var numIterations: Int = 100
private var regParam: Double = 0.0
private var miniBatchFraction: Double = 1.0
extends Optimizer with Logging {
protected var stepSize: Double = 1.0
protected var numIterations: Int = 100
protected var regParam: Double = 0.0
protected var miniBatchFraction: Double = 1.0

/**
* Set the initial step size of SGD for the first step. Default 1.0.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.optimization

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.jblas.DoubleMatrix

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD



/**
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
*/
class GradientDescentWithLocalUpdate(gradient: Gradient, updater: Updater)
extends GradientDescent(gradient, updater) with Logging {
private var numLocalIterations: Int = 1

/**
* Set the number of local iterations. Default 1.
*/
def setNumLocalIterations(numLocalIter: Int): this.type = {
this.numLocalIterations = numLocalIter
this
}

override def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
: Array[Double] = {

val (weights, stochasticLossHistory) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
numLocalIterations,
regParam,
miniBatchFraction,
20,
initialWeights)
weights
}

}

// Top-level method to run gradient descent.
object GradientDescentWithLocalUpdate extends Logging {
/**
* Run gradient descent with local update in parallel using mini batches. Unlike the
* [[GradientDescent]], here gradient descent takes place not only among jobs, but also inner
* jobs, i.e. on an executor.
*
* @param data - Input data for SGD. RDD of form (label, [feature values]).
* @param gradient - Gradient object that will be used to compute the gradient.
* @param updater - Updater object that will be used to update the model.
* @param stepSize - stepSize to be used during update.
* @param numIterations - number of outer iterations that SGD should be run.
* @param numLocalIterations - number of inner iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the stochastic
* loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
numLocalIterations: Int,
regParam: Double,
miniBatchFraction: Double,
tinyBatchSize: Int = 20,
initialWeights: Array[Double]): (Array[Double], Array[Double]) = {

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights: _*)
val regVal = updater
.compute(weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2


for (i <- 1 to numIterations) {
val weightsAndLosses = data.mapPartitions { iter =>
val rand = new Random(42 + i * numIterations)
val sampled = iter.filter(x => rand.nextDouble() <= miniBatchFraction)

val ((weightsAvg, lossAvg, regValAvg), _) = sampled.grouped(tinyBatchSize).map {
case tinyDataSets =>
val dataMatrices = tinyDataSets.map {
case (y, features) => (y, new DoubleMatrix(features.length, 1, features: _*))
}

Iterator.iterate((weights, 0.0, regVal)) {
case (lastWeights, lastLoss, lastRegVal) =>

val (localGrad, localLoss) = dataMatrices.map { case (y, featuresCol) =>
gradient.compute(featuresCol, y, lastWeights)
}.reduce((a, b) => (a._1.addi(b._1), a._2+b._2))

val grad = localGrad.divi(dataMatrices.size)
val loss = localLoss / dataMatrices.size

val (newWeights, newRegVal) = updater.compute(
lastWeights, grad.div(1.0), stepSize, i*numIterations, regParam)

(newWeights, loss+lastLoss, newRegVal+lastRegVal)

}.drop(numLocalIterations).next()

}.foldLeft(((DoubleMatrix.zeros(initialWeights.length), 0.0, 0.0), 0.0)) {
case (((lMatrix, lLoss, lRegVal), count), (rMatrix, rLoss, rRegVal)) =>
((lMatrix.muli(count).addi(rMatrix).divi(count+1),
(lLoss*count+rLoss)/(count+1),
(lRegVal*count+rRegVal)/(count+1)
),
count+1
)
}

val localLossHistory = (lossAvg + regValAvg) / numLocalIterations

List((weightsAvg, localLossHistory)).iterator
}

val c = weightsAndLosses.collect()
val (ws, ls) = c.unzip

stochasticLossHistory.append(ls.reduce(_ + _) / ls.size)

weights = ws.reduce(_ addi _).divi(c.size)
}

logInfo("GradientDescentWithLocalUpdate finished. Last a few stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights.toArray, stochasticLossHistory.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.optimization

import org.scalatest.FunSuite

import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.util.LocalSparkContext

class GradientDescentWithLocalUpdateSuite extends FunSuite with LocalSparkContext {

import GradientDescentSuite._

test("Assert the loss is decreasing.") {
val nPoints = 10000
val A = 2.0
val B = -1.5

val initialB = -1.0
val initialWeights = Array(initialB)

val gradient = new LogisticGradient()
val updater = new SimpleUpdater()
val stepSize = 1.0
val numIterations = 50
val numLocalIterations = 2
val regParam = 0
val miniBatchFrac = 1.0

// Add a extra variable consisting of all 1.0's for the intercept.
val testData = generateGDInput(A, B, nPoints, 42)
val data = testData.map { case LabeledPoint(label, features) =>
label -> Array(1.0, features: _*)
}

val dataRDD = sc.parallelize(data, 2).cache()
val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)

val (_, loss) = GradientDescentWithLocalUpdate.runMiniBatchSGD(
dataRDD,
gradient,
updater,
stepSize,
numIterations,
numLocalIterations,
regParam,
miniBatchFrac,
20,
initialWeightsWithIntercept)

val lossToCompare = loss.drop(10)
assert(lossToCompare.last - lossToCompare.head < 0, "loss isn't decreasing.")

val lossDiff = lossToCompare.init.zip(lossToCompare.tail).map { case (lhs, rhs) => lhs - rhs }
assert(lossDiff.count(_ > 0).toDouble / lossDiff.size > 0.8, "losses seem divergence.")
}
}