Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,21 @@ class LogisticRegressionModel private[mllib] (
class LogisticRegressionWithSGD private (
private var stepSize: Double,
private var numIterations: Int,
private var regParam: Double,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {

private val gradient = new LogisticGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
private val regularizer = new SimpleRegularizer()
override val optimizer = new GradientDescent(gradient, regularizer)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
override protected val validators = List(DataValidators.binaryLabelValidator)

/**
* Construct a LogisticRegression object with default parameters
*/
def this() = this(1.0, 100, 0.0, 1.0)
def this() = this(1.0, 100, 1.0)

override protected def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
Expand Down Expand Up @@ -128,7 +126,7 @@ object LogisticRegressionWithSGD {
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
new LogisticRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input, initialWeights)
}

Expand All @@ -149,7 +147,7 @@ object LogisticRegressionWithSGD {
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
new LogisticRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ class SVMWithSGD private (
extends GeneralizedLinearAlgorithm[SVMModel] with Serializable {

private val gradient = new HingeGradient()
private val updater = new SquaredL2Updater()
override val optimizer = new GradientDescent(gradient, updater)
private val regularizer = new L2Regularizer(regParam)
override val optimizer = new GradientDescent(gradient, regularizer)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
override protected val validators = List(DataValidators.binaryLabelValidator)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.optimization

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV}
import breeze.linalg.{axpy => brzAxpy, DenseVector => BDV}

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
Expand All @@ -29,10 +29,11 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
* @param regularizer Regularizer to be used for regularization.
*/
class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging {
class GradientDescent private[mllib] (
private var gradient: Gradient,
private var regularizer: Regularizer) extends Optimizer with Logging {

private var stepSize: Double = 1.0
private var numIterations: Int = 100
Expand Down Expand Up @@ -69,12 +70,27 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va

/**
* Set the regularization parameter. Default 0.0.
* This is deprecated, and the strength of regularization
* will be controlled by regularizer.
*/
@Deprecated
def setRegParam(regParam: Double): this.type = {
this.regParam = regParam
this
}

/**
* Set the updater function to actually perform a gradient step in a given direction.
* The updater is responsible to perform the update from the regularization term as well,
* and therefore determines what kind or regularization is used, if any.
* This is deprecated, please use regularizer instead.
*/
@Deprecated
def setUpdater(updater: Updater): this.type = {
// this.updater = updater
this
}

/**
* Set the gradient function (of the loss function of one single data example)
* to be used for SGD.
Expand All @@ -84,14 +100,11 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va
this
}


/**
* Set the updater function to actually perform a gradient step in a given direction.
* The updater is responsible to perform the update from the regularization term as well,
* and therefore determines what kind or regularization is used, if any.
* Set the regularizer object to perform the regularization.
*/
def setUpdater(updater: Updater): this.type = {
this.updater = updater
def setRegularizer(regularizer: Regularizer): this.type = {
this.regularizer = regularizer
this
}

Expand All @@ -107,10 +120,9 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
regularizer,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
Expand All @@ -124,6 +136,18 @@ class GradientDescent private[mllib] (private var gradient: Gradient, private va
*/
@DeveloperApi
object GradientDescent extends Logging {

// def runMiniBatchSGD(
// data: RDD[(Double, Vector)],
// gradient: Gradient,
// regularizer: Regularizer,
// stepSize: Double,
// numIterations: Int,
// regParam: Double,
// miniBatchFraction: Double,
// initialWeights: Vector): (Vector, Array[Double]) = {
//

/**
* Run stochastic gradient descent (SGD) in parallel using mini batches.
* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data
Expand All @@ -135,10 +159,9 @@ object GradientDescent extends Logging {
* the form (label, [feature values]).
* @param gradient - Gradient object (used to compute the gradient of the loss function of
* one single data example)
* @param updater - Updater function to actually perform a gradient step in a given direction.
* @param regularizer - Updater function to actually perform a gradient step in a given direction.
* @param stepSize - initial step size for the first step
* @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
Expand All @@ -149,10 +172,9 @@ object GradientDescent extends Logging {
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
regularizer: Regularizer,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {

Expand All @@ -162,42 +184,40 @@ object GradientDescent extends Logging {
val miniBatchSize = numExamples * miniBatchFraction

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)

/**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
val brzWeights = new BDV[Double](initialWeights.toArray.clone())

for (i <- 1 to numIterations) {
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.aggregate((BDV.zeros[Double](weights.size), 0.0))(
.aggregate((BDV.zeros[Double](brzWeights.length), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
val l = gradient.compute(features, label,
Vectors.fromBreeze(brzWeights), Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
(grad1 += grad2, loss1 + loss2)
})

gradientSum :*= (1.0 / miniBatchSize)

val regVal = regularizer.compute(Vectors.fromBreeze(brzWeights),
Vectors.fromBreeze(gradientSum))

/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
* lossSum is computed using the weights from the previous iteration, and regVal is
* the regularization value also computed with the weights from previous iteration.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2

val thisIterStepSize = stepSize / math.sqrt(i)
brzAxpy(-thisIterStepSize, gradientSum, brzWeights)
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)
(Vectors.fromBreeze(brzWeights), stochasticLossHistory.toArray)
}
}
Loading