Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,138 @@ object GammaRegressionWithSGD {
train(input, numIterations, 1.0, 1.0)
}
}

/**
* Train a Gamma regression model with log link using L-BFGS.
*
* Here the data matrix has n rows, and the input RDD holds the set of rows of X, each with
* its corresponding right hand side label y.
*/
class GammaRegressionWithLBFGS private (
private var numCorrections: Int,
private var numIters: Int,
private var convergenceTol: Double,
private var regParam: Double)
extends GeneralizedLinearAlgorithm[GammaRegressionModel] with Serializable {

private val gradient = new GammaLogGradient()
private val updater = new SimpleUpdater()
override val optimizer = new LBFGS(gradient, updater)
.setNumCorrections(numCorrections)
.setConvergenceTol(convergenceTol)
.setMaxNumIterations(numIters)
.setRegParam(regParam)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of us didn't use regParam in the updater. Do we want to remove this parameter or change SimpleUpdater to SquaredL2Updater or L1Updater?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. My opinion is that we keep this parameter, and use L2 regularization. Which regularization to use depends on the data set. In out test suite, L2 is preferable to L1 for the low-dimensional test data.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree


def this() = this(10, 100, 1e-4, 0.0)

override protected def createModel(weights: Vector, intercept: Double) = {
new GammaRegressionModel(weights, intercept)
}
}

object GammaRegressionWithLBFGS {

/**
* Train a GammaRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights. The weights are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
* @param regParam The regularization parameter for L-BFGS.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double,
regParam: Double,
initialWeights: Vector): GammaRegressionModel = {
new GammaRegressionWithLBFGS(
numCorrections,
numIterations,
convergTol,
regParam)
.setIntercept(true)
.run(input, initialWeights)
}

/**
* Train a GammaRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
* @param regParam The regularization parameter for L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double,
regParam: Double): GammaRegressionModel = {
new GammaRegressionWithLBFGS(
numCorrections,
numIterations,
convergTol,
regParam)
.setIntercept(true)
.run(input)
}

/**
* Train a GammaRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double): GammaRegressionModel = {
train(input, numIterations, numCorrections, convergTol, 0.0)
}

/**
* Train a GammaRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int): GammaRegressionModel = {
train(input, numIterations, numCorrections, 1e-4, 0.0)
}

/**
* Train a GammaRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int): GammaRegressionModel = {
train(input, numIterations, 10, 1e-4, 0.0)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,142 @@ class PoissonRegressionModel private[mllib] (
intercept: Double): Double = {
math.exp(weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept)
}

override def toString = {
"Log-linear model: (" + weights.toString + ", " + intercept + ")"
}
}

class PoissonRegressionWithLBFGS private (
private var numCorrections: Int,
private var numIters: Int,
private var convergenceTol: Double,
private var regParam: Double)
extends GeneralizedLinearAlgorithm[PoissonRegressionModel] with Serializable {

private val gradient = new PoissonGradient()
private val updater = new SimpleUpdater()

override val optimizer = new LBFGS(gradient, updater)
.setNumCorrections(numCorrections)
.setConvergenceTol(convergenceTol)
.setMaxNumIterations(numIters)
.setRegParam(regParam)

def this() = this(10, 100, 1e-4, 0.0)

override protected def createModel(weights: Vector, intercept: Double) = {
new PoissonRegressionModel(weights, intercept)
}
}

/**
* Entry for calling Poisson regression.
*/
object PoissonRegressionWithLBFGS {

/**
* Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights. The weights are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
* @param regParam The regularization parameter for L-BFGS.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double,
regParam: Double,
initialWeights: Vector): PoissonRegressionModel = {
new PoissonRegressionWithLBFGS(
numCorrections,
numIterations,
convergTol,
regParam)
.setIntercept(true)
.run(input, initialWeights)
}

/**
* Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
* @param regParam The regularization parameter for L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double,
regParam: Double): PoissonRegressionModel = {
new PoissonRegressionWithLBFGS(
numCorrections,
numIterations,
convergTol,
regParam)
.setIntercept(true)
.run(input)
}

/**
* Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
* @param convergTol The convergence tolerance of iterations for L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int,
convergTol: Double): PoissonRegressionModel = {
train(input, numIterations, numCorrections, convergTol, 0.0)
}

/**
* Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
* @param numCorrections Specific parameter for LBFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
numCorrections: Int): PoissonRegressionModel = {
train(input, numIterations, numCorrections, 1e-4, 0.0)
}

/**
* Train a PoissonRegression model given an RDD of (label, features) pairs. We run a L-BFGS to
* estimate the weights.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations The maximum number of iterations carried out by L-BFGS.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int): PoissonRegressionModel = {
train(input, numIterations, 10, 1e-4, 0.0)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.util

import scala.util.Random

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

@DeveloperApi
object PoissonRegressionDataGenerator {

/**
* Generate an RDD containing the sample data for PoissonRegression.
*
* We first randomly choose the parameters for the Poisson model, then generate
* a series of samples and the corresponding labels.
*
* @param sc SparkContext to use for creating the RDD.
* @param numExamples Number of examples that will be contained in the RDD.
* @param numFeatures Number of features to generate for each example.
* @param useIntercept Whether to use interception in the underlying parameters.
* @param numParts Number of partitions of the generated RDD. Default value is 2.
*/
def generatePoissonRegRDD(
sc: SparkContext,
numExamples: Int,
numFeatures: Int,
useIntercept: Boolean,
numParts: Int = 2): RDD[LabeledPoint] = {
val rnd = new Random(100083)

// the underlying possion regression paramters
val parameters = Vectors dense Array.fill[Double](numFeatures)(rnd.nextDouble())
val intercept = if (useIntercept) rnd.nextGaussian() else 0.0

// generate the data set
sc.parallelize(0 until numExamples, numParts) map { idx =>
val rnd = new Random(32 + idx)
val x = Vectors dense Array.fill[Double](numFeatures)(rnd.nextDouble() * 4.0)
val y = math.exp(parameters.toBreeze dot x.toBreeze + intercept)
LabeledPoint(math rint y, x)
}
}

def main(args: Array[String]) {
if (args.length != 5) {
println("Usage: PoissonRegressionGenerator " +
"<master> <output_dir> <num_examples> <num_features> <num_partitions>")
System.exit(1)
}

val sparkMaster: String = args(0)
val outputPath: String = args(1)
val nexamples: Int = if (args.length > 2) args(2).toInt else 1000
val nfeatures: Int = if (args.length > 3) args(3).toInt else 2
val parts: Int = if (args.length > 4) args(4).toInt else 2

val sc = new SparkContext(sparkMaster, "PoissonRegressionDataGenerator")
val data = generatePoissonRegRDD(sc, nexamples, nfeatures, false, parts)

data.saveAsTextFile(outputPath)
sc.stop()
}
}

Loading