From a37e285b108cd1b7c4583ccf65a05c6f3e6abc2a Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 9 Feb 2016 23:37:27 +0800 Subject: [PATCH 01/17] Initial version of Generalized Linear Regression --- .../GeneralizedLinearRegression.scala | 472 ++++++++++++++++++ .../GeneralizedLinearRegressionSuite.scala | 448 +++++++++++++++++ 2 files changed, 920 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala new file mode 100644 index 0000000000000..68797d50ee3cc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import breeze.stats.distributions.{Gaussian => GD} + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.optim._ +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.mllib.linalg.{BLAS, Vector} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions._ + +/** + * Params for Generalized Linear Regression. + */ +private[regression] trait GeneralizedLinearRegressionParams extends PredictorParams + with HasFitIntercept with HasMaxIter with HasTol with HasRegParam with HasWeightCol + with HasSolver with Logging { + + /** + * Param for the name of family which is a description of the error distribution + * to be used in the model. + * Supported options: "gaussian", "binomial", "poisson" and "gamma". + * @group param + */ + @Since("2.0.0") + final val family: Param[String] = new Param(this, "family", + "the name of family which is a description of the error distribution to be used in the model", + ParamValidators.inArray[String](GeneralizedLinearRegression.supportedFamilies.toArray)) + + /** @group getParam */ + @Since("2.0.0") + def getFamily: String = $(family) + + /** + * Param for the name of the model link function. + * Supported options: "identity", "log", "inverse", "logit", "probit", "cloglog" and "sqrt". + * @group param + */ + @Since("2.0.0") + final val link: Param[String] = new Param(this, "link", "the name of the model link function", + ParamValidators.inArray[String](GeneralizedLinearRegression.supportedLinks.toArray)) + + /** @group getParam */ + @Since("2.0.0") + def getLink: String = $(link) + + @Since("2.0.0") + override def validateParams(): Unit = { + require(GeneralizedLinearRegression.supportedFamilyLinkPairs.contains($(family) -> $(link)), + s"Generalized Linear Regression with ${$(family)} family does not support ${$(link)} " + + s"link function.") + } +} + +/** + * :: Experimental :: + * + * Fit a Generalized Linear Model ([[https://en.wikipedia.org/wiki/Generalized_linear_model]]) + * specified by giving a symbolic description of the linear predictor and + * a description of the error distribution. + */ +@Experimental +@Since("2.0.0") +class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val uid: String) + extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] + with GeneralizedLinearRegressionParams with Logging { + + @Since("2.0.0") + def this() = this(Identifiable.randomUID("genLinReg")) + + /** + * Set the name of family which is a description of the error distribution + * to be used in the model. + * @group setParam + */ + @Since("2.0.0") + def setFamily(value: String): this.type = set(family, value) + + /** + * Set the name of the model link function. + * @group setParam + */ + @Since("2.0.0") + def setLink(value: String): this.type = set(link, value) + + /** + * Set if we should fit the intercept. + * Default is true. + * @group setParam + */ + @Since("2.0.0") + def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) + setDefault(fitIntercept -> true) + + /** + * Set the maximum number of iterations. + * Default is 100. + * @group setParam + */ + @Since("2.0.0") + def setMaxIter(value: Int): this.type = set(maxIter, value) + setDefault(maxIter -> 100) + + /** + * Set the convergence tolerance of iterations. + * Smaller value will lead to higher accuracy with the cost of more iterations. + * Default is 1E-6. + * @group setParam + */ + @Since("2.0.0") + def setTol(value: Double): this.type = set(tol, value) + setDefault(tol -> 1E-6) + + /** + * Set the regularization parameter. + * Default is 0.0. + * @group setParam + */ + @Since("2.0.0") + def setRegParam(value: Double): this.type = set(regParam, value) + setDefault(regParam -> 0.0) + + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + @Since("2.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + + /** + * Set the solver algorithm used for optimization. + * Currently only support "irls" which is also the default solver. + * @group setParam + */ + @Since("2.0.0") + def setSolver(value: String): this.type = set(solver, value) + setDefault(solver -> "irls") + + override protected def train(dataset: DataFrame): GeneralizedLinearRegressionModel = { + val familyLink = $(family) match { + case "gaussian" => Gaussian($(link)) + case "binomial" => Binomial($(link)) + case "poisson" => Poisson($(link)) + case "gamma" => Gamma($(link)) + } + + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val instances: RDD[Instance] = dataset.select( + col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } + + if ($(family) == "gaussian" && $(link) == "identity") { + val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), + standardizeFeatures = true, standardizeLabel = true) + val wlsModel = optimizer.fit(instances) + val model = copyValues(new GeneralizedLinearRegressionModel(uid, + wlsModel.coefficients, wlsModel.intercept).setParent(this)) + return model + } + + val newInstances = instances.map { instance => + val mu = familyLink.initialize(instance.label, instance.weight) + val eta = familyLink.predict(mu) + Instance(eta, instance.weight, instance.features) + } + + val initialModel = new WeightedLeastSquares($(fitIntercept), $(regParam), + standardizeFeatures = true, standardizeLabel = true).fit(newInstances) + + val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double) = { + (instance: Instance, model: WeightedLeastSquaresModel) => { + val eta = model.predict(instance.features) + val mu = familyLink.fitted(eta) + val z = familyLink.adjusted(instance.label, mu, eta) + val w = familyLink.weights(mu) * instance.weight + (z, w) + } + } + + val optimizer = new IterativelyReweightedLeastSquares(initialModel, reweightFunc, + $(fitIntercept), $(regParam), $(maxIter), $(tol)) + + val irlsModel = optimizer.fit(instances) + + val model = copyValues(new GeneralizedLinearRegressionModel(uid, + irlsModel.coefficients, irlsModel.intercept)) + model + } + + @Since("2.0.0") + override def copy(extra: ParamMap): GeneralizedLinearRegression = defaultCopy(extra) +} + +@Since("2.0.0") +object GeneralizedLinearRegression { + + /** Set of families that GeneralizedLinearRegression supports */ + private[ml] val supportedFamilies = Set("gaussian", "binomial", "poisson", "gamma") + + /** Set of links that GeneralizedLinearRegression supports */ + private[ml] val supportedLinks = Set("identity", "log", "inverse", "logit", "probit", + "cloglog", "sqrt") + + /** Set of family and link pairs that GeneralizedLinearRegression supports */ + private[ml] val supportedFamilyLinkPairs = Set( + "gaussian" -> "identity", "gaussian" -> "log", "gaussian" -> "inverse", + "binomial" -> "logit", "binomial" -> "probit", "binomial" -> "cloglog", + "poisson" -> "log", "poisson" -> "identity", "poisson" -> "sqrt", + "gamma" -> "inverse", "gamma" -> "identity", "gamma" -> "log" + ) +} + +/** + * :: Experimental :: + * Model produced by [[GeneralizedLinearRegression]]. + */ +@Experimental +@Since("2.0.0") +class GeneralizedLinearRegressionModel private[ml] ( + @Since("2.0.0") override val uid: String, + @Since("2.0.0") val coefficients: Vector, + @Since("2.0.0") val intercept: Double) + extends RegressionModel[Vector, GeneralizedLinearRegressionModel] + with GeneralizedLinearRegressionParams { + + private lazy val familyLink = $(family) match { + case "gaussian" => Gaussian($(link)) + case "binomial" => Binomial($(link)) + case "poisson" => Poisson($(link)) + case "gamma" => Gamma($(link)) + } + + override protected def predict(features: Vector): Double = { + val eta = BLAS.dot(features, coefficients) + intercept + familyLink.fitted(eta) + } + + @Since("2.0.0") + override def copy(extra: ParamMap): GeneralizedLinearRegressionModel = { + copyValues(new GeneralizedLinearRegressionModel(uid, coefficients, intercept), extra) + .setParent(parent) + } +} + +/** + * A description of the error distribution and link function to be used in the model. + * @param link a link function instance + */ +private[ml] abstract class Family(val link: Link) extends Serializable { + + /** Initialize the starting value for mu. */ + def initialize(y: Double, weight: Double): Double + + /** The variance of mu to its mean. */ + def variance(mu: Double): Double + + /** Weights for IRLS steps. */ + def weights(mu: Double): Double = { + 1.0 / (math.pow(this.link.deriv(mu), 2.0) * this.variance(mu)) + } + + /** The adjusted response variable. */ + def adjusted(y: Double, mu: Double, eta: Double): Double = { + eta + (y - mu) * link.deriv(mu) + } + + /** Linear predictors based on given mu. */ + def predict(mu: Double): Double = this.link.link(mu) + + /** Fitted values based on linear predictors eta. */ + def fitted(eta: Double): Double = this.link.unlink(eta) +} + +/** + * Gaussian exponential family distribution. + * The default link for the Gaussian family is the identity link. + * @param link a link function instance + */ +private[ml] class Gaussian(link: Link = Identity) extends Family(link) { + + override def initialize(y: Double, weight: Double): Double = y + + def variance(mu: Double): Double = 1.0 +} + +private[ml] object Gaussian { + + def apply(link: String): Gaussian = { + link match { + case "identity" => new Gaussian(Identity) + case "log" => new Gaussian(Log) + case "inverse" => new Gaussian(Inverse) + } + } +} + +/** + * Binomial exponential family distribution. + * The default link for the Binomial family is the logit link. + * @param link a link function instance + */ +private[ml] class Binomial(link: Link = Logit) extends Family(link) { + + override def initialize(y: Double, weight: Double): Double = { + (weight * y + 0.5) / (weight + 1.0) + } + + override def variance(mu: Double): Double = mu * (1 - mu) +} + +private[ml] object Binomial { + + def apply(link: String): Binomial = { + link match { + case "logit" => new Binomial(Logit) + case "probit" => new Binomial(Probit) + case "cloglog" => new Binomial(CLogLog) + } + } +} + +/** + * Poisson exponential family distribution. + * The default link for the Poisson family is the log link. + * @param link a link function instance + */ +private[ml] class Poisson(link: Link = Log) extends Family(link) { + + override def initialize(y: Double, weight: Double): Double = y + 0.1 + + override def variance(mu: Double): Double = mu +} + +private[ml] object Poisson { + + def apply(link: String): Poisson = { + link match { + case "log" => new Poisson(Log) + case "sqrt" => new Poisson(Sqrt) + case "identity" => new Poisson(Identity) + } + } +} + +/** + * Gamma exponential family distribution. + * The default link for the Gamma family is the inverse link. + * @param link a link function instance + */ +private[ml] class Gamma(link: Link = Log) extends Family(link) { + + override def initialize(y: Double, weight: Double): Double = y + + override def variance(mu: Double): Double = math.pow(mu, 2.0) +} + +private[ml] object Gamma { + + def apply(link: String): Gamma = { + link match { + case "inverse" => new Gamma(Inverse) + case "identity" => new Gamma(Identity) + case "log" => new Gamma(Log) + } + } +} + +/** + * A description of the link function to be used in the model. + */ +private[ml] trait Link extends Serializable { + + /** The link function. */ + def link(mu: Double): Double + + /** The derivative function. */ + def deriv(mu: Double): Double + + /** The inverse link function. */ + def unlink(eta: Double): Double +} + +private[ml] object Identity extends Link { + + override def link(mu: Double): Double = mu + + override def deriv(mu: Double): Double = 1.0 + + override def unlink(eta: Double): Double = eta +} + +private[ml] object Logit extends Link { + + override def link(mu: Double): Double = math.log(mu / (1.0 - mu)) + + override def deriv(mu: Double): Double = 1.0 / (mu * (1.0 - mu)) + + override def unlink(eta: Double): Double = 1.0 / (1.0 + math.exp(-1.0 * eta)) +} + +private[ml] object Log extends Link { + + override def link(mu: Double): Double = math.log(mu) + + override def deriv(mu: Double): Double = 1.0 / mu + + override def unlink(eta: Double): Double = math.exp(eta) +} + +private[ml] object Inverse extends Link { + + override def link(mu: Double): Double = 1.0 / mu + + override def deriv(mu: Double): Double = -1.0 * math.pow(mu, -2.0) + + override def unlink(eta: Double): Double = 1.0 / eta +} + +private[ml] object Probit extends Link { + + override def link(mu: Double): Double = GD(0.0, 1.0).icdf(mu) + + override def deriv(mu: Double): Double = 1.0 / GD(0.0, 1.0).pdf(GD(0.0, 1.0).icdf(mu)) + + override def unlink(eta: Double): Double = GD(0.0, 1.0).cdf(eta) +} + +private[ml] object CLogLog extends Link { + + override def link(mu: Double): Double = math.log(-1.0 * math.log(1 - mu)) + + override def deriv(mu: Double): Double = 1.0 / ((mu - 1.0) * math.log(1.0 - mu)) + + override def unlink(eta: Double): Double = 1.0 - math.exp(-1.0 * math.exp(eta)) +} + +private[ml] object Sqrt extends Link { + + override def link(mu: Double): Double = math.sqrt(mu) + + override def deriv(mu: Double): Double = 1.0 / (2.0 * math.sqrt(mu)) + + override def unlink(eta: Double): Double = math.pow(eta, 2.0) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala new file mode 100644 index 0000000000000..abfaad57d7ba8 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.regression + +import scala.util.Random + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.ml.util.MLTestingUtils +import org.apache.spark.mllib.classification.LogisticRegressionSuite._ +import org.apache.spark.mllib.linalg.{BLAS, DenseVector, Vectors} +import org.apache.spark.mllib.random._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.sql.{DataFrame, Row} + +class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { + + private val seed: Int = 42 + @transient var datasetGaussianIdentity: DataFrame = _ + @transient var datasetGaussianLog: DataFrame = _ + @transient var datasetGaussianInverse: DataFrame = _ + @transient var datasetBinomial: DataFrame = _ + @transient var datasetPoissonLog: DataFrame = _ + @transient var datasetPoissonIdentity: DataFrame = _ + @transient var datasetPoissonSqrt: DataFrame = _ + @transient var datasetGammaInverse: DataFrame = _ + @transient var datasetGammaIdentity: DataFrame = _ + @transient var datasetGammaLog: DataFrame = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + import GeneralizedLinearRegressionSuite._ + + datasetGaussianIdentity = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gaussian", link = "identity"), 2)) + + datasetGaussianLog = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gaussian", link = "log"), 2)) + + datasetGaussianInverse = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gaussian", link = "inverse"), 2)) + + datasetBinomial = { + val nPoints = 10000 + val coefficients = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191) + val xMean = Array(5.843, 3.057, 3.758, 1.199) + val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) + + val testData = + generateMultinomialLogisticInput(coefficients, xMean, xVariance, true, nPoints, seed) + + sqlContext.createDataFrame(sc.parallelize(testData, 4)) + } + + datasetPoissonLog = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "poisson", link = "log"), 2)) + + datasetPoissonIdentity = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "poisson", link = "identity"), 2)) + + datasetPoissonSqrt = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "poisson", link = "sqrt"), 2)) + + datasetGammaInverse = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gamma", link = "inverse"), 2)) + + datasetGammaIdentity = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gamma", link = "identity"), 2)) + + datasetGammaLog = sqlContext.createDataFrame( + sc.parallelize(generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + family = "gamma", link = "log"), 2)) + } + + test("params") { + ParamsSuite.checkParams(new GeneralizedLinearRegression) + val model = new GeneralizedLinearRegressionModel("genLinReg", Vectors.dense(0.0), 0.0) + ParamsSuite.checkParams(model) + } + + test("generalized linear regression: default params") { + val glr = new GeneralizedLinearRegression + assert(glr.getLabelCol === "label") + assert(glr.getFeaturesCol === "features") + assert(glr.getPredictionCol === "prediction") + assert(glr.getFitIntercept) + assert(glr.getMaxIter === 100) + assert(glr.getTol === 1E-6) + assert(glr.getWeightCol === "") + assert(glr.getRegParam === 0.0) + assert(glr.getSolver == "irls") + val model = glr.setFamily("gaussian").setLink("identity") + .fit(datasetGaussianIdentity) + + // copied model must have the same parent. + MLTestingUtils.checkCopy(model) + model.transform(datasetGaussianIdentity) + .select("label", "prediction") + .collect() + + assert(model.getFeaturesCol === "features") + assert(model.getPredictionCol === "prediction") + assert(model.intercept !== 0.0) + assert(model.hasParent) + assert(model.getFamily === "gaussian") + assert(model.getLink === "identity") + } + + test("generalized linear regression: gaussian family") { + /* + R code: + f1 <- data$V1 ~ data$V2 + data$V3 - 1 + f2 <- data$V1 ~ data$V2 + data$V3 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family="gaussian", data=data) + print(as.vector(coef(model))) + } + + [1] 2.2960999 0.8087933 + [1] 2.5002642 2.2000403 0.5999485 + + data <- read.csv("path", header=FALSE) + model1 <- glm(f1, family=gaussian(link=log), data=data, start=c(0,0)) + model2 <- glm(f2, family=gaussian(link=log), data=data, start=c(0,0,0)) + print(as.vector(coef(model1))) + print(as.vector(coef(model2))) + + [1] 0.23069326 0.07993778 + [1] 0.25001858 0.22002452 0.05998789 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family=gaussian(link=inverse), data=data) + print(as.vector(coef(model))) + } + + [1] 2.3010179 0.8198976 + [1] 2.4108902 2.2130248 0.6086152 + */ + + val expected = Seq( + Vectors.dense(0.0, 2.2960999, 0.8087933), + Vectors.dense(2.5002642, 2.2000403, 0.5999485), + Vectors.dense(0.0, 0.23069326, 0.07993778), + Vectors.dense(0.25001858, 0.22002452, 0.05998789), + Vectors.dense(0.0, 2.3010179, 0.8198976), + Vectors.dense(2.4108902, 2.2130248, 0.6086152)) + + var idx = 0 + for ((link, dataset) <- Seq(("identity", datasetGaussianIdentity), ("log", datasetGaussianLog), + ("inverse", datasetGaussianInverse))) { + for (fitIntercept <- Seq(false, true)) { + val trainer = new GeneralizedLinearRegression().setFamily("gaussian").setLink(link) + .setFitIntercept(fitIntercept) + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + + val familyLink = Gaussian(link) + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + val prediction2 = familyLink.fitted(eta) + assert(prediction1 ~== prediction2 relTol 1E-5) + } + + idx += 1 + } + } + } + + test("generalized linear regression: binomial family") { + /* + R code: + f1 <- data$V1 ~ data$V2 + data$V3 + data$V4 + data$V5 - 1 + f2 <- data$V1 ~ data$V2 + data$V3 + data$V4 + data$V5 + data <- read.csv("path", header=FALSE) + + for (formula in c(f1, f2)) { + model <- glm(formula, family="binomial", data=data) + print(as.vector(coef(model))) + } + + [1] -0.3560284 1.3010002 -0.3570805 -0.7406762 + [1] 2.8367406 -0.5896187 0.8931655 -0.3925169 -0.7996989 + + for (formula in c(f1, f2)) { + model <- glm(formula, family=binomial(link=probit), data=data) + print(as.vector(coef(model))) + } + + [1] -0.2134390 0.7800646 -0.2144267 -0.4438358 + [1] 1.6995366 -0.3524694 0.5332651 -0.2352985 -0.4780850 + + for (formula in c(f1, f2)) { + model <- glm(formula, family=binomial(link=cloglog), data=data) + print(as.vector(coef(model))) + } + + [1] -0.2832198 0.8434144 -0.2524727 -0.5293452 + [1] 1.5063590 -0.4038015 0.6133664 -0.2687882 -0.5541758 + */ + val expected = Seq( + Vectors.dense(0.0, -0.3560284, 1.3010002, -0.3570805, -0.7406762), + Vectors.dense(2.8367406, -0.5896187, 0.8931655, -0.3925169, -0.7996989), + Vectors.dense(0.0, -0.2134390, 0.7800646, -0.2144267, -0.4438358), + Vectors.dense(1.6995366, -0.3524694, 0.5332651, -0.2352985, -0.4780850), + Vectors.dense(0.0, -0.2832198, 0.8434144, -0.2524727, -0.5293452), + Vectors.dense(1.5063590, -0.4038015, 0.6133664, -0.2687882, -0.5541758)) + + var idx = 0 + for ((link, dataset) <- Seq(("logit", datasetBinomial), ("probit", datasetBinomial), + ("cloglog", datasetBinomial))) { + for (fitIntercept <- Seq(false, true)) { + val trainer = new GeneralizedLinearRegression().setFamily("binomial").setLink(link) + .setFitIntercept(fitIntercept) + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1), + model.coefficients(2), model.coefficients(3)) + assert(actual ~== expected(idx) absTol 1e-4) + + val familyLink = Binomial(link) + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + val prediction2 = familyLink.fitted(eta) + assert(prediction1 ~== prediction2 relTol 1E-5) + } + + idx += 1 + } + } + } + + test("generalized linear regression: poisson family") { + /* + R code: + f1 <- data$V1 ~ data$V2 + data$V3 - 1 + f2 <- data$V1 ~ data$V2 + data$V3 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family="poisson", data=data) + print(as.vector(coef(model))) + } + + [1] 0.22999393 0.08047088 + [1] 0.25022353 0.21998599 0.05998621 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family=poisson(link=identity), data=data) + print(as.vector(coef(model))) + } + + [1] 2.2929501 0.8119415 + [1] 2.5012730 2.1999407 0.5999107 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family=poisson(link=sqrt), data=data) + print(as.vector(coef(model))) + } + + [1] 2.2958947 0.8090515 + [1] 2.5000480 2.1999972 0.5999968 + */ + val expected = Seq( + Vectors.dense(0.0, 0.22999393, 0.08047088), + Vectors.dense(0.25022353, 0.21998599, 0.05998621), + Vectors.dense(0.0, 2.2929501, 0.8119415), + Vectors.dense(2.5012730, 2.1999407, 0.5999107), + Vectors.dense(0.0, 2.2958947, 0.8090515), + Vectors.dense(2.5000480, 2.1999972, 0.5999968)) + + var idx = 0 + for ((link, dataset) <- Seq(("log", datasetPoissonLog), ("identity", datasetPoissonIdentity), + ("sqrt", datasetPoissonSqrt))) { + for (fitIntercept <- Seq(false, true)) { + val trainer = new GeneralizedLinearRegression().setFamily("poisson").setLink(link) + .setFitIntercept(fitIntercept) + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + + val familyLink = Poisson(link) + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + val prediction2 = familyLink.fitted(eta) + assert(prediction1 ~== prediction2 relTol 1E-5) + } + + idx += 1 + } + } + } + + test("generalized linear regression: gamma family") { + /* + R code: + f1 <- data$V1 ~ data$V2 + data$V3 - 1 + f2 <- data$V1 ~ data$V2 + data$V3 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family="Gamma", data=data) + print(as.vector(coef(model))) + } + + [1] 2.3392419 0.8058058 + [1] 2.3507700 2.2533574 0.6042991 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family=Gamma(link=identity), data=data) + print(as.vector(coef(model))) + } + + [1] 2.2908883 0.8147796 + [1] 2.5002406 2.1998346 0.6000059 + + data <- read.csv("path", header=FALSE) + for (formula in c(f1, f2)) { + model <- glm(formula, family=Gamma(link=log), data=data) + print(as.vector(coef(model))) + } + + [1] 0.22958970 0.08091066 + [1] 0.25003210 0.21996957 0.06000215 + */ + val expected = Seq( + Vectors.dense(0.0, 2.3392419, 0.8058058), + Vectors.dense(2.3507700, 2.2533574, 0.6042991), + Vectors.dense(0.0, 2.2908883, 0.8147796), + Vectors.dense(2.5002406, 2.1998346, 0.6000059), + Vectors.dense(0.0, 0.22958970, 0.08091066), + Vectors.dense(0.25003210, 0.21996957, 0.06000215)) + + var idx = 0 + for ((link, dataset) <- Seq(("inverse", datasetGammaInverse), + ("identity", datasetGammaIdentity), ("log", datasetGammaLog))) { + for (fitIntercept <- Seq(false, true)) { + val trainer = new GeneralizedLinearRegression().setFamily("gamma").setLink(link) + .setFitIntercept(fitIntercept) + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + + val familyLink = Gamma(link) + model.transform(dataset).select("features", "prediction").collect().foreach { + case Row(features: DenseVector, prediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + val prediction2 = familyLink.fitted(eta) + assert(prediction1 ~== prediction2 relTol 1E-5) + } + + idx += 1 + } + } + } +} + +object GeneralizedLinearRegressionSuite { + + def generateGeneralizedLinearRegressionInput( + intercept: Double, + coefficients: Array[Double], + xMean: Array[Double], + xVariance: Array[Double], + nPoints: Int, + seed: Int, + eps: Double, + family: String, + link: String): Seq[LabeledPoint] = { + + val rnd = new Random(seed) + def rndElement(i: Int) = { + (rnd.nextDouble() - 0.5) * math.sqrt(12.0 * xVariance(i)) + xMean(i) + } + val (generator, mean) = family match { + case "gaussian" => (new StandardNormalGenerator, 0.0) + case "poisson" => (new PoissonGenerator(1.0), 1.0) + case "gamma" => (new GammaGenerator(1.0, 1.0), 1.0) + } + generator.setSeed(seed) + + (0 until nPoints).map { _ => + val features = Vectors.dense(coefficients.indices.map { rndElement(_) }.toArray) + val eta = BLAS.dot(Vectors.dense(coefficients), features) + intercept + val mu = link match { + case "identity" => eta + case "log" => math.exp(eta) + case "sqrt" => math.pow(eta, 2.0) + case "inverse" => 1.0 / eta + } + val label = mu + eps * (generator.nextValue() - mean) + // Return LabeledPoints with DenseVector + LabeledPoint(label, features) + } + } +} From 3082686d61a851c240f16a9cd7aafc97fa6e3296 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 10 Feb 2016 00:16:53 +0800 Subject: [PATCH 02/17] fix setParent --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 68797d50ee3cc..56074b45a84cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -211,7 +211,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val irlsModel = optimizer.fit(instances) val model = copyValues(new GeneralizedLinearRegressionModel(uid, - irlsModel.coefficients, irlsModel.intercept)) + irlsModel.coefficients, irlsModel.intercept).setParent(this)) model } From 378ad6c9a08a2d48d5b1221ecb53d37d5f34c765 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 11 Feb 2016 22:30:20 +0800 Subject: [PATCH 03/17] clear doc of numFeatures <= 4096, and fix default link --- .../GeneralizedLinearRegression.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 56074b45a84cb..d57cb21164862 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.regression import breeze.stats.distributions.{Gaussian => GD} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.feature.Instance @@ -164,10 +164,19 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val override protected def train(dataset: DataFrame): GeneralizedLinearRegressionModel = { val familyLink = $(family) match { - case "gaussian" => Gaussian($(link)) - case "binomial" => Binomial($(link)) - case "poisson" => Poisson($(link)) - case "gamma" => Gamma($(link)) + case "gaussian" => if (isDefined(link)) Gaussian($(link)) else Gaussian("identity") + case "binomial" => if (isDefined(link)) Binomial($(link)) else Binomial("logit") + case "poisson" => if (isDefined(link)) Poisson($(link)) else Poisson("log") + case "gamma" => if (isDefined(link)) Gamma($(link)) else Gamma("inverse") + } + + val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { + case Row(features: Vector) => features.size + }.first() + if (numFeatures > 4096) { + val msg = "Currently, GeneralizedLinearRegression only supports number of features" + + s" <= 4096. Found $numFeatures in the input dataset." + throw new SparkException(msg) } val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) @@ -252,10 +261,10 @@ class GeneralizedLinearRegressionModel private[ml] ( with GeneralizedLinearRegressionParams { private lazy val familyLink = $(family) match { - case "gaussian" => Gaussian($(link)) - case "binomial" => Binomial($(link)) - case "poisson" => Poisson($(link)) - case "gamma" => Gamma($(link)) + case "gaussian" => if (isDefined(link)) Gaussian($(link)) else Gaussian("identity") + case "binomial" => if (isDefined(link)) Binomial($(link)) else Binomial("logit") + case "poisson" => if (isDefined(link)) Poisson($(link)) else Poisson("log") + case "gamma" => if (isDefined(link)) Gamma($(link)) else Gamma("inverse") } override protected def predict(features: Vector): Double = { @@ -279,7 +288,7 @@ private[ml] abstract class Family(val link: Link) extends Serializable { /** Initialize the starting value for mu. */ def initialize(y: Double, weight: Double): Double - /** The variance of mu to its mean. */ + /** The variance of the endogenous variable's mean, given the value mu. */ def variance(mu: Double): Double /** Weights for IRLS steps. */ @@ -375,7 +384,7 @@ private[ml] object Poisson { * The default link for the Gamma family is the inverse link. * @param link a link function instance */ -private[ml] class Gamma(link: Link = Log) extends Family(link) { +private[ml] class Gamma(link: Link = Inverse) extends Family(link) { override def initialize(y: Double, weight: Double): Double = y @@ -401,7 +410,7 @@ private[ml] trait Link extends Serializable { /** The link function. */ def link(mu: Double): Double - /** The derivative function. */ + /** Derivative of the link function. */ def deriv(mu: Double): Double /** The inverse link function. */ From 97c3f6a2b94d6705606dd020f61c5c604b562b4c Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 11 Feb 2016 22:36:17 +0800 Subject: [PATCH 04/17] fix validateParams --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index d57cb21164862..ab5ad683dbdb7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -69,9 +69,11 @@ private[regression] trait GeneralizedLinearRegressionParams extends PredictorPar @Since("2.0.0") override def validateParams(): Unit = { - require(GeneralizedLinearRegression.supportedFamilyLinkPairs.contains($(family) -> $(link)), - s"Generalized Linear Regression with ${$(family)} family does not support ${$(link)} " + - s"link function.") + if (isDefined(link)) { + require(GeneralizedLinearRegression.supportedFamilyLinkPairs.contains($(family) -> $(link)), + s"Generalized Linear Regression with ${$(family)} family does not support ${$(link)} " + + s"link function.") + } } } From cc101470bfd42b6fcabfe62ebf401ddf5d2bc1a7 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 14 Feb 2016 18:10:05 +0800 Subject: [PATCH 05/17] add constriction to mu for family --- .../GeneralizedLinearRegression.scala | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index ab5ad683dbdb7..4af16afa0cf35 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -295,19 +295,25 @@ private[ml] abstract class Family(val link: Link) extends Serializable { /** Weights for IRLS steps. */ def weights(mu: Double): Double = { - 1.0 / (math.pow(this.link.deriv(mu), 2.0) * this.variance(mu)) + val x = clean(mu) + 1.0 / (math.pow(this.link.deriv(x), 2.0) * this.variance(x)) } /** The adjusted response variable. */ def adjusted(y: Double, mu: Double, eta: Double): Double = { - eta + (y - mu) * link.deriv(mu) + val x = clean(mu) + eta + (y - x) * link.deriv(x) } /** Linear predictors based on given mu. */ - def predict(mu: Double): Double = this.link.link(mu) + def predict(mu: Double): Double = this.link.link(clean(mu)) /** Fitted values based on linear predictors eta. */ - def fitted(eta: Double): Double = this.link.unlink(eta) + def fitted(eta: Double): Double = clean(this.link.unlink(eta)) + + def clean(mu: Double): Double = mu + + val epsilon: Double = 1E-16 } /** @@ -317,7 +323,13 @@ private[ml] abstract class Family(val link: Link) extends Serializable { */ private[ml] class Gaussian(link: Link = Identity) extends Family(link) { - override def initialize(y: Double, weight: Double): Double = y + override def initialize(y: Double, weight: Double): Double = { + if (link == Log) { + require(y > 0.0, "The response variable of Gaussian family with Log link " + + s"should be positive, but got $y") + } + y + } def variance(mu: Double): Double = 1.0 } @@ -341,10 +353,23 @@ private[ml] object Gaussian { private[ml] class Binomial(link: Link = Logit) extends Family(link) { override def initialize(y: Double, weight: Double): Double = { - (weight * y + 0.5) / (weight + 1.0) + val mu = (weight * y + 0.5) / (weight + 1.0) + require(mu > 0.0 && mu < 1.0, "The response variable of Binomial family" + + s"should be in range (0, 1), but got $mu") + mu } - override def variance(mu: Double): Double = mu * (1 - mu) + override def variance(mu: Double): Double = mu * (1.0 - mu) + + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu > 1.0 - epsilon) { + 1.0 - epsilon + } else { + mu + } + } } private[ml] object Binomial { @@ -365,7 +390,11 @@ private[ml] object Binomial { */ private[ml] class Poisson(link: Link = Log) extends Family(link) { - override def initialize(y: Double, weight: Double): Double = y + 0.1 + override def initialize(y: Double, weight: Double): Double = { + require(y > 0.0, "The response variable of Poisson family " + + s"should be positive, but got $y") + y + } override def variance(mu: Double): Double = mu } @@ -388,7 +417,11 @@ private[ml] object Poisson { */ private[ml] class Gamma(link: Link = Inverse) extends Family(link) { - override def initialize(y: Double, weight: Double): Double = y + override def initialize(y: Double, weight: Double): Double = { + require(y > 0.0, "The response variable of Gamma family " + + s"should be positive, but got $y") + y + } override def variance(mu: Double): Double = math.pow(mu, 2.0) } From 4a27970486091b6359b9d73ba477044d04c20875 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 15 Feb 2016 18:31:57 +0800 Subject: [PATCH 06/17] Add clean for Gaussian, Poisson and Gamma --- .../GeneralizedLinearRegression.scala | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 4af16afa0cf35..1f5d2a217fc62 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -305,12 +305,13 @@ private[ml] abstract class Family(val link: Link) extends Serializable { eta + (y - x) * link.deriv(x) } - /** Linear predictors based on given mu. */ + /** Linear predictor based on given mu. */ def predict(mu: Double): Double = this.link.link(clean(mu)) - /** Fitted values based on linear predictors eta. */ + /** Fitted value based on linear predictor eta. */ def fitted(eta: Double): Double = clean(this.link.unlink(eta)) + /** Trim the fitted value so that it will be in valid range. */ def clean(mu: Double): Double = mu val epsilon: Double = 1E-16 @@ -332,6 +333,16 @@ private[ml] class Gaussian(link: Link = Identity) extends Family(link) { } def variance(mu: Double): Double = 1.0 + + override def clean(mu: Double): Double = { + if (mu.isNegInfinity) { + Double.MinValue + } else if (mu.isPosInfinity) { + Double.MaxValue + } else { + mu + } + } } private[ml] object Gaussian { @@ -397,6 +408,16 @@ private[ml] class Poisson(link: Link = Log) extends Family(link) { } override def variance(mu: Double): Double = mu + + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu.isInfinity) { + Double.MaxValue + } else { + mu + } + } } private[ml] object Poisson { @@ -424,6 +445,16 @@ private[ml] class Gamma(link: Link = Inverse) extends Family(link) { } override def variance(mu: Double): Double = math.pow(mu, 2.0) + + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu.isInfinity) { + Double.MaxValue + } else { + mu + } + } } private[ml] object Gamma { From cc90cb7424bbbff700ba345628b39358b5c5756b Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 23 Feb 2016 17:12:37 +0800 Subject: [PATCH 07/17] Do some refactor of GLM --- .../spark/ml/optim/WeightedLeastSquares.scala | 6 +- .../GeneralizedLinearRegression.scala | 528 +++++++++--------- .../ml/regression/LinearRegression.scala | 4 +- .../GeneralizedLinearRegressionSuite.scala | 16 +- 4 files changed, 279 insertions(+), 275 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 61b3642131810..d35699b5d764d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -156,6 +156,8 @@ private[ml] class WeightedLeastSquares( private[ml] object WeightedLeastSquares { + val MaxNumFeatures: Int = 4096 + /** * Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]]. */ @@ -174,8 +176,8 @@ private[ml] object WeightedLeastSquares { private var aaSum: DenseVector = _ private def init(k: Int): Unit = { - require(k <= 4096, "In order to take the normal equation approach efficiently, " + - s"we set the max number of features to 4096 but got $k.") + require(k <= MaxNumFeatures, "In order to take the normal equation approach efficiently, " + + s"we set the max number of features to $MaxNumFeatures but got $k.") this.k = k triK = k * (k + 1) / 2 count = 0L diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 1f5d2a217fc62..42adf0e20d7d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.functions._ /** * Params for Generalized Linear Regression. */ -private[regression] trait GeneralizedLinearRegressionParams extends PredictorParams +private[regression] trait GeneralizedLinearRegressionBase extends PredictorParams with HasFitIntercept with HasMaxIter with HasTol with HasRegParam with HasWeightCol with HasSolver with Logging { @@ -48,7 +48,7 @@ private[regression] trait GeneralizedLinearRegressionParams extends PredictorPar @Since("2.0.0") final val family: Param[String] = new Param(this, "family", "the name of family which is a description of the error distribution to be used in the model", - ParamValidators.inArray[String](GeneralizedLinearRegression.supportedFamilies.toArray)) + ParamValidators.inArray[String](GeneralizedLinearRegression.supportedFamilyNames.toArray)) /** @group getParam */ @Since("2.0.0") @@ -61,16 +61,22 @@ private[regression] trait GeneralizedLinearRegressionParams extends PredictorPar */ @Since("2.0.0") final val link: Param[String] = new Param(this, "link", "the name of the model link function", - ParamValidators.inArray[String](GeneralizedLinearRegression.supportedLinks.toArray)) + ParamValidators.inArray[String](GeneralizedLinearRegression.supportedLinkNames.toArray)) /** @group getParam */ @Since("2.0.0") def getLink: String = $(link) + import GeneralizedLinearRegression._ + protected lazy val familyObj = Family.fromName($(family)) + protected lazy val linkObj = if (isDefined(link)) Link.fromName($(link)) else familyObj.defaultLink + protected lazy val familyAndLink = new FamilyAndLink(familyObj, linkObj) + @Since("2.0.0") override def validateParams(): Unit = { if (isDefined(link)) { - require(GeneralizedLinearRegression.supportedFamilyLinkPairs.contains($(family) -> $(link)), + import GeneralizedLinearRegression._ + require(supportedFamilyAndLinkParis.contains(familyObj -> linkObj), s"Generalized Linear Regression with ${$(family)} family does not support ${$(link)} " + s"link function.") } @@ -88,21 +94,20 @@ private[regression] trait GeneralizedLinearRegressionParams extends PredictorPar @Since("2.0.0") class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val uid: String) extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] - with GeneralizedLinearRegressionParams with Logging { + with GeneralizedLinearRegressionBase with Logging { @Since("2.0.0") - def this() = this(Identifiable.randomUID("genLinReg")) + def this() = this(Identifiable.randomUID("glm")) /** - * Set the name of family which is a description of the error distribution - * to be used in the model. + * Sets the value of param [[family]]. * @group setParam */ @Since("2.0.0") def setFamily(value: String): this.type = set(family, value) /** - * Set the name of the model link function. + * Sets the value of param [[link]]. * @group setParam */ @Since("2.0.0") @@ -115,7 +120,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val */ @Since("2.0.0") def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) - setDefault(fitIntercept -> true) /** * Set the maximum number of iterations. @@ -146,8 +150,8 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(regParam -> 0.0) /** - * Whether to over-/under-sample training instances according to the given weights in weightCol. - * If empty, all instances are treated equally (weight 1.0). + * Sets [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. * Default is empty, so all instances have weight one. * @group setParam */ @@ -165,64 +169,49 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(solver -> "irls") override protected def train(dataset: DataFrame): GeneralizedLinearRegressionModel = { - val familyLink = $(family) match { - case "gaussian" => if (isDefined(link)) Gaussian($(link)) else Gaussian("identity") - case "binomial" => if (isDefined(link)) Binomial($(link)) else Binomial("logit") - case "poisson" => if (isDefined(link)) Poisson($(link)) else Poisson("log") - case "gamma" => if (isDefined(link)) Gamma($(link)) else Gamma("inverse") - } - val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { case Row(features: Vector) => features.size }.first() - if (numFeatures > 4096) { + if (numFeatures > WeightedLeastSquares.MaxNumFeatures) { val msg = "Currently, GeneralizedLinearRegression only supports number of features" + s" <= 4096. Found $numFeatures in the input dataset." throw new SparkException(msg) } val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val instances: RDD[Instance] = dataset.select( - col($(labelCol)), w, col($(featuresCol))).map { - case Row(label: Double, weight: Double, features: Vector) => + val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))) + .map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) - } + } if ($(family) == "gaussian" && $(link) == "identity") { val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), standardizeFeatures = true, standardizeLabel = true) val wlsModel = optimizer.fit(instances) - val model = copyValues(new GeneralizedLinearRegressionModel(uid, - wlsModel.coefficients, wlsModel.intercept).setParent(this)) + val model = copyValues( + new GeneralizedLinearRegressionModel(uid, wlsModel.coefficients, wlsModel.intercept) + .setParent(this)) return model } val newInstances = instances.map { instance => - val mu = familyLink.initialize(instance.label, instance.weight) - val eta = familyLink.predict(mu) + val mu = familyObj.initialize(instance.label, instance.weight) + val eta = familyAndLink.predict(mu) Instance(eta, instance.weight, instance.features) } val initialModel = new WeightedLeastSquares($(fitIntercept), $(regParam), - standardizeFeatures = true, standardizeLabel = true).fit(newInstances) + standardizeFeatures = true, standardizeLabel = true) + .fit(newInstances) - val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double) = { - (instance: Instance, model: WeightedLeastSquaresModel) => { - val eta = model.predict(instance.features) - val mu = familyLink.fitted(eta) - val z = familyLink.adjusted(instance.label, mu, eta) - val w = familyLink.weights(mu) * instance.weight - (z, w) - } - } - - val optimizer = new IterativelyReweightedLeastSquares(initialModel, reweightFunc, + val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol)) val irlsModel = optimizer.fit(instances) - val model = copyValues(new GeneralizedLinearRegressionModel(uid, - irlsModel.coefficients, irlsModel.intercept).setParent(this)) + val model = copyValues( + new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept) + .setParent(this)) model } @@ -231,317 +220,322 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } @Since("2.0.0") -object GeneralizedLinearRegression { +private[ml] object GeneralizedLinearRegression { + + /** Set of family and link pairs that GeneralizedLinearRegression supports */ + lazy val supportedFamilyAndLinkParis = Set( + Gaussian -> Identity, Gaussian -> Log, Gaussian -> Inverse, + Binomial -> Logit, Binomial -> Probit, Binomial -> CLogLog, + Poisson -> Log, Poisson -> Identity, Poisson -> Sqrt, + Gamma -> Inverse, Gamma -> Identity, Gamma -> Log + ) /** Set of families that GeneralizedLinearRegression supports */ - private[ml] val supportedFamilies = Set("gaussian", "binomial", "poisson", "gamma") + lazy val supportedFamilyNames = supportedFamilyAndLinkParis.map(_._1.name) /** Set of links that GeneralizedLinearRegression supports */ - private[ml] val supportedLinks = Set("identity", "log", "inverse", "logit", "probit", - "cloglog", "sqrt") + lazy val supportedLinkNames = supportedFamilyAndLinkParis.map(_._2.name) - /** Set of family and link pairs that GeneralizedLinearRegression supports */ - private[ml] val supportedFamilyLinkPairs = Set( - "gaussian" -> "identity", "gaussian" -> "log", "gaussian" -> "inverse", - "binomial" -> "logit", "binomial" -> "probit", "binomial" -> "cloglog", - "poisson" -> "log", "poisson" -> "identity", "poisson" -> "sqrt", - "gamma" -> "inverse", "gamma" -> "identity", "gamma" -> "log" - ) -} + val epsilon: Double = 1E-16 -/** - * :: Experimental :: - * Model produced by [[GeneralizedLinearRegression]]. - */ -@Experimental -@Since("2.0.0") -class GeneralizedLinearRegressionModel private[ml] ( - @Since("2.0.0") override val uid: String, - @Since("2.0.0") val coefficients: Vector, - @Since("2.0.0") val intercept: Double) - extends RegressionModel[Vector, GeneralizedLinearRegressionModel] - with GeneralizedLinearRegressionParams { + /** + * One wrapper of family and link instance to be used in the model. + * @param family the family instance + * @param link the link instance + */ + private[ml] class FamilyAndLink(val family: Family, var link: Link) extends Serializable { - private lazy val familyLink = $(family) match { - case "gaussian" => if (isDefined(link)) Gaussian($(link)) else Gaussian("identity") - case "binomial" => if (isDefined(link)) Binomial($(link)) else Binomial("logit") - case "poisson" => if (isDefined(link)) Poisson($(link)) else Poisson("log") - case "gamma" => if (isDefined(link)) Gamma($(link)) else Gamma("inverse") - } + /** Weights for IRLS steps. */ + def weights(mu: Double): Double = { + val x = family.clean(mu) + 1.0 / (math.pow(this.link.deriv(x), 2.0) * family.variance(x)) + } - override protected def predict(features: Vector): Double = { - val eta = BLAS.dot(features, coefficients) + intercept - familyLink.fitted(eta) - } + /** The adjusted response variable. */ + def adjusted(y: Double, mu: Double, eta: Double): Double = { + val x = family.clean(mu) + eta + (y - x) * link.deriv(x) + } - @Since("2.0.0") - override def copy(extra: ParamMap): GeneralizedLinearRegressionModel = { - copyValues(new GeneralizedLinearRegressionModel(uid, coefficients, intercept), extra) - .setParent(parent) + /** Linear predictor based on given mu. */ + def predict(mu: Double): Double = link.link(family.clean(mu)) + + /** Fitted value based on linear predictor eta. */ + def fitted(eta: Double): Double = family.clean(link.unlink(eta)) + + val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double) = { + (instance: Instance, model: WeightedLeastSquaresModel) => { + val eta = model.predict(instance.features) + val mu = fitted(eta) + val z = adjusted(instance.label, mu, eta) + val w = weights(mu) * instance.weight + (z, w) + } + } } -} -/** - * A description of the error distribution and link function to be used in the model. - * @param link a link function instance - */ -private[ml] abstract class Family(val link: Link) extends Serializable { + /** + * A description of the error distribution to be used in the model. + * @param name the name of the family + */ + private[ml] abstract class Family(val name: String) extends Serializable { - /** Initialize the starting value for mu. */ - def initialize(y: Double, weight: Double): Double + /** The default link instance of this family. */ + val defaultLink: Link - /** The variance of the endogenous variable's mean, given the value mu. */ - def variance(mu: Double): Double + /** Initialize the starting value for mu. */ + def initialize(y: Double, weight: Double): Double - /** Weights for IRLS steps. */ - def weights(mu: Double): Double = { - val x = clean(mu) - 1.0 / (math.pow(this.link.deriv(x), 2.0) * this.variance(x)) - } + /** The variance of the endogenous variable's mean, given the value mu. */ + def variance(mu: Double): Double - /** The adjusted response variable. */ - def adjusted(y: Double, mu: Double, eta: Double): Double = { - val x = clean(mu) - eta + (y - x) * link.deriv(x) + /** Trim the fitted value so that it will be in valid range. */ + def clean(mu: Double): Double = mu } - /** Linear predictor based on given mu. */ - def predict(mu: Double): Double = this.link.link(clean(mu)) + private[ml] object Family { + + /** + * Gets the [[Family]] object from its name. + * @param name family name: "gaussian", "binomial", "poisson" or "gamma". + */ + def fromName(name: String): Family = { + name match { + case Gaussian.name => Gaussian + case Binomial.name => Binomial + case Poisson.name => Poisson + case Gamma.name => Gamma + } + } + } - /** Fitted value based on linear predictor eta. */ - def fitted(eta: Double): Double = clean(this.link.unlink(eta)) + /** + * Gaussian exponential family distribution. + * The default link for the Gaussian family is the identity link. + */ + private[ml] object Gaussian extends Family("gaussian") { - /** Trim the fitted value so that it will be in valid range. */ - def clean(mu: Double): Double = mu + val defaultLink: Link = Identity - val epsilon: Double = 1E-16 -} + override def initialize(y: Double, weight: Double): Double = y -/** - * Gaussian exponential family distribution. - * The default link for the Gaussian family is the identity link. - * @param link a link function instance - */ -private[ml] class Gaussian(link: Link = Identity) extends Family(link) { + def variance(mu: Double): Double = 1.0 - override def initialize(y: Double, weight: Double): Double = { - if (link == Log) { - require(y > 0.0, "The response variable of Gaussian family with Log link " + - s"should be positive, but got $y") + override def clean(mu: Double): Double = { + if (mu.isNegInfinity) { + Double.MinValue + } else if (mu.isPosInfinity) { + Double.MaxValue + } else { + mu + } } - y } - def variance(mu: Double): Double = 1.0 + /** + * Binomial exponential family distribution. + * The default link for the Binomial family is the logit link. + */ + private[ml] object Binomial extends Family("binomial") { + + val defaultLink: Link = Logit - override def clean(mu: Double): Double = { - if (mu.isNegInfinity) { - Double.MinValue - } else if (mu.isPosInfinity) { - Double.MaxValue - } else { + override def initialize(y: Double, weight: Double): Double = { + val mu = (weight * y + 0.5) / (weight + 1.0) + require(mu > 0.0 && mu < 1.0, "The response variable of Binomial family" + + s"should be in range (0, 1), but got $mu") mu } - } -} -private[ml] object Gaussian { + override def variance(mu: Double): Double = mu * (1.0 - mu) - def apply(link: String): Gaussian = { - link match { - case "identity" => new Gaussian(Identity) - case "log" => new Gaussian(Log) - case "inverse" => new Gaussian(Inverse) + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu > 1.0 - epsilon) { + 1.0 - epsilon + } else { + mu + } } } -} - -/** - * Binomial exponential family distribution. - * The default link for the Binomial family is the logit link. - * @param link a link function instance - */ -private[ml] class Binomial(link: Link = Logit) extends Family(link) { - override def initialize(y: Double, weight: Double): Double = { - val mu = (weight * y + 0.5) / (weight + 1.0) - require(mu > 0.0 && mu < 1.0, "The response variable of Binomial family" + - s"should be in range (0, 1), but got $mu") - mu - } + /** + * Poisson exponential family distribution. + * The default link for the Poisson family is the log link. + */ + private[ml] object Poisson extends Family("poisson") { - override def variance(mu: Double): Double = mu * (1.0 - mu) + val defaultLink: Link = Log - override def clean(mu: Double): Double = { - if (mu < epsilon) { - epsilon - } else if (mu > 1.0 - epsilon) { - 1.0 - epsilon - } else { - mu + override def initialize(y: Double, weight: Double): Double = { + require(y > 0.0, "The response variable of Poisson family " + + s"should be positive, but got $y") + y } - } -} -private[ml] object Binomial { + override def variance(mu: Double): Double = mu - def apply(link: String): Binomial = { - link match { - case "logit" => new Binomial(Logit) - case "probit" => new Binomial(Probit) - case "cloglog" => new Binomial(CLogLog) + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu.isInfinity) { + Double.MaxValue + } else { + mu + } } } -} -/** - * Poisson exponential family distribution. - * The default link for the Poisson family is the log link. - * @param link a link function instance - */ -private[ml] class Poisson(link: Link = Log) extends Family(link) { - - override def initialize(y: Double, weight: Double): Double = { - require(y > 0.0, "The response variable of Poisson family " + - s"should be positive, but got $y") - y - } + /** + * Gamma exponential family distribution. + * The default link for the Gamma family is the inverse link. + */ + private[ml] object Gamma extends Family("gamma") { - override def variance(mu: Double): Double = mu + val defaultLink: Link = Inverse - override def clean(mu: Double): Double = { - if (mu < epsilon) { - epsilon - } else if (mu.isInfinity) { - Double.MaxValue - } else { - mu + override def initialize(y: Double, weight: Double): Double = { + require(y > 0.0, "The response variable of Gamma family " + + s"should be positive, but got $y") + y } - } -} -private[ml] object Poisson { + override def variance(mu: Double): Double = math.pow(mu, 2.0) - def apply(link: String): Poisson = { - link match { - case "log" => new Poisson(Log) - case "sqrt" => new Poisson(Sqrt) - case "identity" => new Poisson(Identity) + override def clean(mu: Double): Double = { + if (mu < epsilon) { + epsilon + } else if (mu.isInfinity) { + Double.MaxValue + } else { + mu + } } } -} -/** - * Gamma exponential family distribution. - * The default link for the Gamma family is the inverse link. - * @param link a link function instance - */ -private[ml] class Gamma(link: Link = Inverse) extends Family(link) { + /** + * A description of the link function to be used in the model. + * The link function provides the relationship between the linear predictor + * and the mean of the distribution function. + * @param name the name of link function + */ + private[ml] abstract class Link(val name: String) extends Serializable { - override def initialize(y: Double, weight: Double): Double = { - require(y > 0.0, "The response variable of Gamma family " + - s"should be positive, but got $y") - y - } + /** The link function. */ + def link(mu: Double): Double - override def variance(mu: Double): Double = math.pow(mu, 2.0) + /** Derivative of the link function. */ + def deriv(mu: Double): Double - override def clean(mu: Double): Double = { - if (mu < epsilon) { - epsilon - } else if (mu.isInfinity) { - Double.MaxValue - } else { - mu - } + /** The inverse link function. */ + def unlink(eta: Double): Double } -} -private[ml] object Gamma { - - def apply(link: String): Gamma = { - link match { - case "inverse" => new Gamma(Inverse) - case "identity" => new Gamma(Identity) - case "log" => new Gamma(Log) + private[ml] object Link { + + /** + * Gets the [[Link]] object from its name. + * @param name link name: "identity", "logit", "log", "inverse", "probit", "cloglog" or "sqrt". + */ + def fromName(name: String): Link = { + name match { + case Identity.name => Identity + case Logit.name => Logit + case Log.name => Log + case Inverse.name => Inverse + case Probit.name => Probit + case CLogLog.name => CLogLog + case Sqrt.name => Sqrt + } } } -} -/** - * A description of the link function to be used in the model. - */ -private[ml] trait Link extends Serializable { + private[ml] object Identity extends Link("identity") { - /** The link function. */ - def link(mu: Double): Double + override def link(mu: Double): Double = mu - /** Derivative of the link function. */ - def deriv(mu: Double): Double + override def deriv(mu: Double): Double = 1.0 - /** The inverse link function. */ - def unlink(eta: Double): Double -} + override def unlink(eta: Double): Double = eta + } -private[ml] object Identity extends Link { + private[ml] object Logit extends Link("logit") { - override def link(mu: Double): Double = mu + override def link(mu: Double): Double = math.log(mu / (1.0 - mu)) - override def deriv(mu: Double): Double = 1.0 + override def deriv(mu: Double): Double = 1.0 / (mu * (1.0 - mu)) - override def unlink(eta: Double): Double = eta -} + override def unlink(eta: Double): Double = 1.0 / (1.0 + math.exp(-1.0 * eta)) + } -private[ml] object Logit extends Link { + private[ml] object Log extends Link("log") { - override def link(mu: Double): Double = math.log(mu / (1.0 - mu)) + override def link(mu: Double): Double = math.log(mu) - override def deriv(mu: Double): Double = 1.0 / (mu * (1.0 - mu)) + override def deriv(mu: Double): Double = 1.0 / mu - override def unlink(eta: Double): Double = 1.0 / (1.0 + math.exp(-1.0 * eta)) -} + override def unlink(eta: Double): Double = math.exp(eta) + } -private[ml] object Log extends Link { + private[ml] object Inverse extends Link("inverse") { - override def link(mu: Double): Double = math.log(mu) + override def link(mu: Double): Double = 1.0 / mu - override def deriv(mu: Double): Double = 1.0 / mu + override def deriv(mu: Double): Double = -1.0 * math.pow(mu, -2.0) - override def unlink(eta: Double): Double = math.exp(eta) -} + override def unlink(eta: Double): Double = 1.0 / eta + } -private[ml] object Inverse extends Link { + private[ml] object Probit extends Link("probit") { - override def link(mu: Double): Double = 1.0 / mu + override def link(mu: Double): Double = GD(0.0, 1.0).icdf(mu) - override def deriv(mu: Double): Double = -1.0 * math.pow(mu, -2.0) + override def deriv(mu: Double): Double = 1.0 / GD(0.0, 1.0).pdf(GD(0.0, 1.0).icdf(mu)) - override def unlink(eta: Double): Double = 1.0 / eta -} + override def unlink(eta: Double): Double = GD(0.0, 1.0).cdf(eta) + } -private[ml] object Probit extends Link { + private[ml] object CLogLog extends Link("cloglog") { - override def link(mu: Double): Double = GD(0.0, 1.0).icdf(mu) + override def link(mu: Double): Double = math.log(-1.0 * math.log(1 - mu)) - override def deriv(mu: Double): Double = 1.0 / GD(0.0, 1.0).pdf(GD(0.0, 1.0).icdf(mu)) + override def deriv(mu: Double): Double = 1.0 / ((mu - 1.0) * math.log(1.0 - mu)) - override def unlink(eta: Double): Double = GD(0.0, 1.0).cdf(eta) -} + override def unlink(eta: Double): Double = 1.0 - math.exp(-1.0 * math.exp(eta)) + } -private[ml] object CLogLog extends Link { + private[ml] object Sqrt extends Link("sqrt") { - override def link(mu: Double): Double = math.log(-1.0 * math.log(1 - mu)) + override def link(mu: Double): Double = math.sqrt(mu) - override def deriv(mu: Double): Double = 1.0 / ((mu - 1.0) * math.log(1.0 - mu)) + override def deriv(mu: Double): Double = 1.0 / (2.0 * math.sqrt(mu)) - override def unlink(eta: Double): Double = 1.0 - math.exp(-1.0 * math.exp(eta)) + override def unlink(eta: Double): Double = math.pow(eta, 2.0) + } } -private[ml] object Sqrt extends Link { - - override def link(mu: Double): Double = math.sqrt(mu) +/** + * :: Experimental :: + * Model produced by [[GeneralizedLinearRegression]]. + */ +@Experimental +@Since("2.0.0") +class GeneralizedLinearRegressionModel private[ml] ( + @Since("2.0.0") override val uid: String, + @Since("2.0.0") val coefficients: Vector, + @Since("2.0.0") val intercept: Double) + extends RegressionModel[Vector, GeneralizedLinearRegressionModel] + with GeneralizedLinearRegressionBase { - override def deriv(mu: Double): Double = 1.0 / (2.0 * math.sqrt(mu)) + override protected def predict(features: Vector): Double = { + val eta = BLAS.dot(features, coefficients) + intercept + familyAndLink.fitted(eta) + } - override def unlink(eta: Double): Double = math.pow(eta, 2.0) + @Since("2.0.0") + override def copy(extra: ParamMap): GeneralizedLinearRegressionModel = { + copyValues(new GeneralizedLinearRegressionModel(uid, coefficients, intercept), extra) + .setParent(parent) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index e253f25c0ea65..0dfe733df4692 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -163,8 +163,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String }.first() val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && numFeatures <= 4096) || - $(solver) == "normal") { + if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && + numFeatures <= WeightedLeastSquares.MaxNumFeatures) || $(solver) == "normal") { require($(elasticNetParam) == 0.0, "Only L2 regularization can be used when normal " + "solver is used.'") // For low dimensional data, WeightedLeastSquares is more efficiently since the diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index abfaad57d7ba8..a45c539576086 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -192,6 +192,8 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark Vectors.dense(0.0, 2.3010179, 0.8198976), Vectors.dense(2.4108902, 2.2130248, 0.6086152)) + import GeneralizedLinearRegression._ + var idx = 0 for ((link, dataset) <- Seq(("identity", datasetGaussianIdentity), ("log", datasetGaussianLog), ("inverse", datasetGaussianInverse))) { @@ -202,7 +204,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~== expected(idx) absTol 1e-4) - val familyLink = Gaussian(link) + val familyLink = new FamilyAndLink(Gaussian, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept @@ -254,6 +256,8 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark Vectors.dense(0.0, -0.2832198, 0.8434144, -0.2524727, -0.5293452), Vectors.dense(1.5063590, -0.4038015, 0.6133664, -0.2687882, -0.5541758)) + import GeneralizedLinearRegression._ + var idx = 0 for ((link, dataset) <- Seq(("logit", datasetBinomial), ("probit", datasetBinomial), ("cloglog", datasetBinomial))) { @@ -265,7 +269,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark model.coefficients(2), model.coefficients(3)) assert(actual ~== expected(idx) absTol 1e-4) - val familyLink = Binomial(link) + val familyLink = new FamilyAndLink(Binomial, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept @@ -319,6 +323,8 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark Vectors.dense(0.0, 2.2958947, 0.8090515), Vectors.dense(2.5000480, 2.1999972, 0.5999968)) + import GeneralizedLinearRegression._ + var idx = 0 for ((link, dataset) <- Seq(("log", datasetPoissonLog), ("identity", datasetPoissonIdentity), ("sqrt", datasetPoissonSqrt))) { @@ -329,7 +335,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~== expected(idx) absTol 1e-4) - val familyLink = Poisson(link) + val familyLink = new FamilyAndLink(Poisson, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept @@ -383,6 +389,8 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark Vectors.dense(0.0, 0.22958970, 0.08091066), Vectors.dense(0.25003210, 0.21996957, 0.06000215)) + import GeneralizedLinearRegression._ + var idx = 0 for ((link, dataset) <- Seq(("inverse", datasetGammaInverse), ("identity", datasetGammaIdentity), ("log", datasetGammaLog))) { @@ -393,7 +401,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~== expected(idx) absTol 1e-4) - val familyLink = Gamma(link) + val familyLink = new FamilyAndLink(Gamma, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept From 40bfd09032635b8b62a0841bac9ef854f4cac169 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 23 Feb 2016 18:19:36 +0800 Subject: [PATCH 08/17] Better documents --- .../GeneralizedLinearRegression.scala | 63 ++++++++++--------- .../GeneralizedLinearRegressionSuite.scala | 1 - 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 42adf0e20d7d4..645094f52e3c7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -43,11 +43,13 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam * Param for the name of family which is a description of the error distribution * to be used in the model. * Supported options: "gaussian", "binomial", "poisson" and "gamma". + * Default is "gaussian". * @group param */ @Since("2.0.0") final val family: Param[String] = new Param(this, "family", - "the name of family which is a description of the error distribution to be used in the model", + "The name of family which is a description of the error distribution to be used in the " + + "model. Supported options: gaussian(default), binomial, poisson and gamma.", ParamValidators.inArray[String](GeneralizedLinearRegression.supportedFamilyNames.toArray)) /** @group getParam */ @@ -55,12 +57,16 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getFamily: String = $(family) /** - * Param for the name of the model link function. + * Param for the name of the model link function which provides the relationship + * between the linear predictor and the mean of the distribution function. * Supported options: "identity", "log", "inverse", "logit", "probit", "cloglog" and "sqrt". * @group param */ @Since("2.0.0") - final val link: Param[String] = new Param(this, "link", "the name of the model link function", + final val link: Param[String] = new Param(this, "link", "The name of the model link function " + + "which provides the relationship between the linear predictor and the mean of the " + + "distribution function. Supported options: identity, log, inverse, logit, probit, cloglog " + + "and sqrt.", ParamValidators.inArray[String](GeneralizedLinearRegression.supportedLinkNames.toArray)) /** @group getParam */ @@ -74,6 +80,9 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam @Since("2.0.0") override def validateParams(): Unit = { + if ($(solver) == "irls") { + setDefault(maxIter -> 25) + } if (isDefined(link)) { import GeneralizedLinearRegression._ require(supportedFamilyAndLinkParis.contains(familyObj -> linkObj), @@ -87,8 +96,15 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam * :: Experimental :: * * Fit a Generalized Linear Model ([[https://en.wikipedia.org/wiki/Generalized_linear_model]]) - * specified by giving a symbolic description of the linear predictor and - * a description of the error distribution. + * specified by giving a symbolic description of the linear predictor (link function) and + * a description of the error distribution (family). + * It supports "gaussian", "binomial", "poisson" and "gamma" as family. + * Valid link functions for each family is listed below. The first link function of each family + * is the default one. + * - "gaussian" -> "identity", "log", "inverse" + * - "binomial" -> "logit", "probit", "cloglog" + * - "poisson" -> "log", "identity", "sqrt" + * - "gamma" -> "inverse", "identity", "log" */ @Experimental @Since("2.0.0") @@ -101,10 +117,12 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val /** * Sets the value of param [[family]]. + * Default is "gaussian". * @group setParam */ @Since("2.0.0") def setFamily(value: String): this.type = set(family, value) + setDefault(family -> "gaussian") /** * Sets the value of param [[link]]. @@ -114,7 +132,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val def setLink(value: String): this.type = set(link, value) /** - * Set if we should fit the intercept. + * Sets if we should fit the intercept. * Default is true. * @group setParam */ @@ -122,16 +140,15 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val def setFitIntercept(value: Boolean): this.type = set(fitIntercept, value) /** - * Set the maximum number of iterations. - * Default is 100. + * Sets the maximum number of iterations. + * Default is 25 if the solver algorithm is "irls". * @group setParam */ @Since("2.0.0") def setMaxIter(value: Int): this.type = set(maxIter, value) - setDefault(maxIter -> 100) /** - * Set the convergence tolerance of iterations. + * Sets the convergence tolerance of iterations. * Smaller value will lead to higher accuracy with the cost of more iterations. * Default is 1E-6. * @group setParam @@ -141,7 +158,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(tol -> 1E-6) /** - * Set the regularization parameter. + * Sets the regularization parameter. * Default is 0.0. * @group setParam */ @@ -150,7 +167,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(regParam -> 0.0) /** - * Sets [[weightCol]]. + * Sets the value of param [[weightCol]]. * If this is not set or empty, we treat all instance weights as 1.0. * Default is empty, so all instances have weight one. * @group setParam @@ -160,7 +177,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(weightCol -> "") /** - * Set the solver algorithm used for optimization. + * Sets the solver algorithm used for optimization. * Currently only support "irls" which is also the default solver. * @group setParam */ @@ -239,24 +256,10 @@ private[ml] object GeneralizedLinearRegression { val epsilon: Double = 1E-16 /** - * One wrapper of family and link instance to be used in the model. - * @param family the family instance - * @param link the link instance + * Wrapper of family and link instance. */ private[ml] class FamilyAndLink(val family: Family, var link: Link) extends Serializable { - /** Weights for IRLS steps. */ - def weights(mu: Double): Double = { - val x = family.clean(mu) - 1.0 / (math.pow(this.link.deriv(x), 2.0) * family.variance(x)) - } - - /** The adjusted response variable. */ - def adjusted(y: Double, mu: Double, eta: Double): Double = { - val x = family.clean(mu) - eta + (y - x) * link.deriv(x) - } - /** Linear predictor based on given mu. */ def predict(mu: Double): Double = link.link(family.clean(mu)) @@ -267,8 +270,8 @@ private[ml] object GeneralizedLinearRegression { (instance: Instance, model: WeightedLeastSquaresModel) => { val eta = model.predict(instance.features) val mu = fitted(eta) - val z = adjusted(instance.label, mu, eta) - val w = weights(mu) * instance.weight + val z = eta + (instance.label - mu) * link.deriv(mu) + val w = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) (z, w) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index a45c539576086..334f63c7b0648 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -128,7 +128,6 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark assert(glr.getFeaturesCol === "features") assert(glr.getPredictionCol === "prediction") assert(glr.getFitIntercept) - assert(glr.getMaxIter === 100) assert(glr.getTol === 1E-6) assert(glr.getWeightCol === "") assert(glr.getRegParam === 0.0) From 10bef717ad2d433b227aab58bb3bab8c11f67ab7 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 24 Feb 2016 12:39:54 +0800 Subject: [PATCH 09/17] Add test with regularization against glmnet. --- .../GeneralizedLinearRegression.scala | 3 +- .../GeneralizedLinearRegressionSuite.scala | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 645094f52e3c7..500186daa5a21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -201,7 +201,8 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val Instance(label, weight, features) } - if ($(family) == "gaussian" && $(link) == "identity") { + import GeneralizedLinearRegression._ + if (familyObj == Gaussian && linkObj == Identity) { val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), standardizeFeatures = true, standardizeLabel = true) val wlsModel = optimizer.fit(instances) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 334f63c7b0648..0c8356c106d5c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -216,6 +216,50 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark } } + test("generalized linear regression: gaussian family against glmnet") { + /* + R code: + library(glmnet) + data <- read.csv("path", header=FALSE) + label = data$V1 + features = as.matrix(data.frame(data$V2, data$V3)) + for (intercept in c(FALSE, TRUE)) { + for (lambda in c(0.0, 0.1, 1.0)) { + model <- glmnet(features, label, family="gaussian", intercept=intercept, + lambda=lambda, alpha=0, thresh=1E-14) + print(as.vector(coef(model))) + } + } + + [1] 0.0000000 2.2961005 0.8087932 + [1] 0.0000000 2.2130368 0.8309556 + [1] 0.0000000 1.7176137 0.9610657 + [1] 2.5002642 2.2000403 0.5999485 + [1] 3.1106389 2.0935142 0.5712711 + [1] 6.7597127 1.4581054 0.3994266 + */ + + val expected = Seq( + Vectors.dense(0.0, 2.2961005, 0.8087932), + Vectors.dense(0.0, 2.2130368, 0.8309556), + Vectors.dense(0.0, 1.7176137, 0.9610657), + Vectors.dense(2.5002642, 2.2000403, 0.5999485), + Vectors.dense(3.1106389, 2.0935142, 0.5712711), + Vectors.dense(6.7597127, 1.4581054, 0.3994266)) + + var idx = 0 + for (fitIntercept <- Seq(false, true); + regParam <- Seq(0.0, 0.1, 1.0)) { + val trainer = new GeneralizedLinearRegression().setFamily("gaussian") + .setFitIntercept(fitIntercept).setRegParam(regParam) + val model = trainer.fit(datasetGaussianIdentity) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + + idx += 1 + } + } + test("generalized linear regression: binomial family") { /* R code: From 71e12f1d041999ad7ca4163d4b18ceb3980cde6a Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 24 Feb 2016 15:09:40 +0800 Subject: [PATCH 10/17] update doc --- .../GeneralizedLinearRegression.scala | 85 +++++++++++-------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 500186daa5a21..5e8aac929d803 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -57,16 +57,16 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getFamily: String = $(family) /** - * Param for the name of the model link function which provides the relationship + * Param for the name of link function which provides the relationship * between the linear predictor and the mean of the distribution function. * Supported options: "identity", "log", "inverse", "logit", "probit", "cloglog" and "sqrt". * @group param */ @Since("2.0.0") - final val link: Param[String] = new Param(this, "link", "The name of the model link function " + + final val link: Param[String] = new Param(this, "link", "The name of link function " + "which provides the relationship between the linear predictor and the mean of the " + - "distribution function. Supported options: identity, log, inverse, logit, probit, cloglog " + - "and sqrt.", + "distribution function. Supported options: identity, log, inverse, logit, probit, " + + "cloglog and sqrt.", ParamValidators.inArray[String](GeneralizedLinearRegression.supportedLinkNames.toArray)) /** @group getParam */ @@ -84,10 +84,9 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam setDefault(maxIter -> 25) } if (isDefined(link)) { - import GeneralizedLinearRegression._ - require(supportedFamilyAndLinkParis.contains(familyObj -> linkObj), - s"Generalized Linear Regression with ${$(family)} family does not support ${$(link)} " + - s"link function.") + require(GeneralizedLinearRegression.supportedFamilyAndLinkParis.contains( + familyObj -> linkObj), s"Generalized Linear Regression with ${$(family)} family " + + s"does not support ${$(link)} link function.") } } } @@ -186,12 +185,13 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(solver -> "irls") override protected def train(dataset: DataFrame): GeneralizedLinearRegressionModel = { - val numFeatures = dataset.select(col($(featuresCol))).limit(1).map { - case Row(features: Vector) => features.size - }.first() + val numFeatures = dataset.select(col($(featuresCol))).limit(1) + .map { case Row(features: Vector) => + features.size + }.first() if (numFeatures > WeightedLeastSquares.MaxNumFeatures) { val msg = "Currently, GeneralizedLinearRegression only supports number of features" + - s" <= 4096. Found $numFeatures in the input dataset." + s" <= ${WeightedLeastSquares.MaxNumFeatures}. Found $numFeatures in the input dataset." throw new SparkException(msg) } @@ -201,8 +201,9 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val Instance(label, weight, features) } - import GeneralizedLinearRegression._ - if (familyObj == Gaussian && linkObj == Identity) { + if (familyObj == GeneralizedLinearRegression.Gaussian && + linkObj == GeneralizedLinearRegression.Identity) { + // TODO: Make standardizeFeatures and standardizeLabel configurable. val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), standardizeFeatures = true, standardizeLabel = true) val wlsModel = optimizer.fit(instances) @@ -212,19 +213,10 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val return model } - val newInstances = instances.map { instance => - val mu = familyObj.initialize(instance.label, instance.weight) - val eta = familyAndLink.predict(mu) - Instance(eta, instance.weight, instance.features) - } - - val initialModel = new WeightedLeastSquares($(fitIntercept), $(regParam), - standardizeFeatures = true, standardizeLabel = true) - .fit(newInstances) - + // Fit Generalized Linear Model by iteratively reweighted least squares (IRLS). + val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam)) val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol)) - val irlsModel = optimizer.fit(instances) val model = copyValues( @@ -240,7 +232,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val @Since("2.0.0") private[ml] object GeneralizedLinearRegression { - /** Set of family and link pairs that GeneralizedLinearRegression supports */ + /** Set of family and link pairs that GeneralizedLinearRegression supports. */ lazy val supportedFamilyAndLinkParis = Set( Gaussian -> Identity, Gaussian -> Log, Gaussian -> Inverse, Binomial -> Logit, Binomial -> Probit, Binomial -> CLogLog, @@ -248,16 +240,16 @@ private[ml] object GeneralizedLinearRegression { Gamma -> Inverse, Gamma -> Identity, Gamma -> Log ) - /** Set of families that GeneralizedLinearRegression supports */ + /** Set of family names that GeneralizedLinearRegression supports. */ lazy val supportedFamilyNames = supportedFamilyAndLinkParis.map(_._1.name) - /** Set of links that GeneralizedLinearRegression supports */ + /** Set of link names that GeneralizedLinearRegression supports. */ lazy val supportedLinkNames = supportedFamilyAndLinkParis.map(_._2.name) val epsilon: Double = 1E-16 /** - * Wrapper of family and link instance. + * Wrapper of family and link combination used in the model. */ private[ml] class FamilyAndLink(val family: Family, var link: Link) extends Serializable { @@ -267,20 +259,42 @@ private[ml] object GeneralizedLinearRegression { /** Fitted value based on linear predictor eta. */ def fitted(eta: Double): Double = family.clean(link.unlink(eta)) + /** + * Get the initial guess model for [[IterativelyReweightedLeastSquares]]. + */ + def initialize( + instances: RDD[Instance], + fitIntercept: Boolean, + regParam: Double): WeightedLeastSquaresModel = { + val newInstances = instances.map { instance => + val mu = family.initialize(instance.label, instance.weight) + val eta = predict(mu) + Instance(eta, instance.weight, instance.features) + } + val initialModel = new WeightedLeastSquares(fitIntercept, regParam, + standardizeFeatures = true, standardizeLabel = true) + .fit(newInstances) + initialModel + } + + /** + * The reweight function used to update offsets and weights + * at each iteration of [[IterativelyReweightedLeastSquares]]. + */ val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double) = { (instance: Instance, model: WeightedLeastSquaresModel) => { val eta = model.predict(instance.features) val mu = fitted(eta) - val z = eta + (instance.label - mu) * link.deriv(mu) - val w = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) - (z, w) + val offset = eta + (instance.label - mu) * link.deriv(mu) + val weight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) + (offset, weight) } } } /** * A description of the error distribution to be used in the model. - * @param name the name of the family + * @param name the name of the family. */ private[ml] abstract class Family(val name: String) extends Serializable { @@ -422,7 +436,7 @@ private[ml] object GeneralizedLinearRegression { * A description of the link function to be used in the model. * The link function provides the relationship between the linear predictor * and the mean of the distribution function. - * @param name the name of link function + * @param name the name of link function. */ private[ml] abstract class Link(val name: String) extends Serializable { @@ -440,7 +454,8 @@ private[ml] object GeneralizedLinearRegression { /** * Gets the [[Link]] object from its name. - * @param name link name: "identity", "logit", "log", "inverse", "probit", "cloglog" or "sqrt". + * @param name link name: "identity", "logit", "log", + * "inverse", "probit", "cloglog" or "sqrt". */ def fromName(name: String): Link = { name match { From 8c84052006f392867090338e48fbf8f07e48e20d Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 24 Feb 2016 15:19:55 +0800 Subject: [PATCH 11/17] rename clean() to project() --- .../regression/GeneralizedLinearRegression.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 5e8aac929d803..f923aa52b2a76 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -254,10 +254,10 @@ private[ml] object GeneralizedLinearRegression { private[ml] class FamilyAndLink(val family: Family, var link: Link) extends Serializable { /** Linear predictor based on given mu. */ - def predict(mu: Double): Double = link.link(family.clean(mu)) + def predict(mu: Double): Double = link.link(family.project(mu)) /** Fitted value based on linear predictor eta. */ - def fitted(eta: Double): Double = family.clean(link.unlink(eta)) + def fitted(eta: Double): Double = family.project(link.unlink(eta)) /** * Get the initial guess model for [[IterativelyReweightedLeastSquares]]. @@ -271,6 +271,7 @@ private[ml] object GeneralizedLinearRegression { val eta = predict(mu) Instance(eta, instance.weight, instance.features) } + // TODO: Make standardizeFeatures and standardizeLabel configurable. val initialModel = new WeightedLeastSquares(fitIntercept, regParam, standardizeFeatures = true, standardizeLabel = true) .fit(newInstances) @@ -308,7 +309,7 @@ private[ml] object GeneralizedLinearRegression { def variance(mu: Double): Double /** Trim the fitted value so that it will be in valid range. */ - def clean(mu: Double): Double = mu + def project(mu: Double): Double = mu } private[ml] object Family { @@ -339,7 +340,7 @@ private[ml] object GeneralizedLinearRegression { def variance(mu: Double): Double = 1.0 - override def clean(mu: Double): Double = { + override def project(mu: Double): Double = { if (mu.isNegInfinity) { Double.MinValue } else if (mu.isPosInfinity) { @@ -367,7 +368,7 @@ private[ml] object GeneralizedLinearRegression { override def variance(mu: Double): Double = mu * (1.0 - mu) - override def clean(mu: Double): Double = { + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon } else if (mu > 1.0 - epsilon) { @@ -394,7 +395,7 @@ private[ml] object GeneralizedLinearRegression { override def variance(mu: Double): Double = mu - override def clean(mu: Double): Double = { + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon } else if (mu.isInfinity) { @@ -421,7 +422,7 @@ private[ml] object GeneralizedLinearRegression { override def variance(mu: Double): Double = math.pow(mu, 2.0) - override def clean(mu: Double): Double = { + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon } else if (mu.isInfinity) { From aa89fdcb837e12481bed23f781925ea6e8f6acbe Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 24 Feb 2016 15:25:45 +0800 Subject: [PATCH 12/17] Fix typos --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 6 +++++- .../ml/regression/GeneralizedLinearRegressionSuite.scala | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index f923aa52b2a76..4a40ea8890e19 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -75,7 +75,11 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam import GeneralizedLinearRegression._ protected lazy val familyObj = Family.fromName($(family)) - protected lazy val linkObj = if (isDefined(link)) Link.fromName($(link)) else familyObj.defaultLink + protected lazy val linkObj = if (isDefined(link)) { + Link.fromName($(link)) + } else { + familyObj.defaultLink + } protected lazy val familyAndLink = new FamilyAndLink(familyObj, linkObj) @Since("2.0.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 0c8356c106d5c..58713c9b732af 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -149,7 +149,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark assert(model.getLink === "identity") } - test("generalized linear regression: gaussian family") { + test("generalized linear regression: gaussian family against glm") { /* R code: f1 <- data$V1 ~ data$V2 + data$V3 - 1 @@ -260,7 +260,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark } } - test("generalized linear regression: binomial family") { + test("generalized linear regression: binomial family against glm") { /* R code: f1 <- data$V1 ~ data$V2 + data$V3 + data$V4 + data$V5 - 1 @@ -325,7 +325,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark } } - test("generalized linear regression: poisson family") { + test("generalized linear regression: poisson family against glm") { /* R code: f1 <- data$V1 ~ data$V2 + data$V3 - 1 @@ -391,7 +391,7 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark } } - test("generalized linear regression: gamma family") { + test("generalized linear regression: gamma family against glm") { /* R code: f1 <- data$V1 ~ data$V2 + data$V3 - 1 From 2ebcef728315ad6b03d48c7f7e7f504e5e193748 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 25 Feb 2016 18:51:26 +0800 Subject: [PATCH 13/17] Address comments --- .../spark/ml/optim/WeightedLeastSquares.scala | 4 ++ .../GeneralizedLinearRegression.scala | 46 ++++++++++++------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index d35699b5d764d..fdf1998a7a7d9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -156,6 +156,10 @@ private[ml] class WeightedLeastSquares( private[ml] object WeightedLeastSquares { + /** + * In order to take the normal equation approach efficiently, [[WeightedLeastSquares]] + * only supports the number of features is no more than 4096. + */ val MaxNumFeatures: Int = 4096 /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 4a40ea8890e19..f8e2999c45e75 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -74,13 +74,6 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getLink: String = $(link) import GeneralizedLinearRegression._ - protected lazy val familyObj = Family.fromName($(family)) - protected lazy val linkObj = if (isDefined(link)) { - Link.fromName($(link)) - } else { - familyObj.defaultLink - } - protected lazy val familyAndLink = new FamilyAndLink(familyObj, linkObj) @Since("2.0.0") override def validateParams(): Unit = { @@ -88,9 +81,9 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam setDefault(maxIter -> 25) } if (isDefined(link)) { - require(GeneralizedLinearRegression.supportedFamilyAndLinkParis.contains( - familyObj -> linkObj), s"Generalized Linear Regression with ${$(family)} family " + - s"does not support ${$(link)} link function.") + require(supportedFamilyAndLinkPairs.contains( + Family.fromName($(family)) -> Link.fromName($(link))), "Generalized Linear Regression " + + s"with ${$(family)} family does not support ${$(link)} link function.") } } } @@ -115,6 +108,8 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val extends Regressor[Vector, GeneralizedLinearRegression, GeneralizedLinearRegressionModel] with GeneralizedLinearRegressionBase with Logging { + import GeneralizedLinearRegression._ + @Since("2.0.0") def this() = this(Identifiable.randomUID("glm")) @@ -125,7 +120,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val */ @Since("2.0.0") def setFamily(value: String): this.type = set(family, value) - setDefault(family -> "gaussian") + setDefault(family -> Gaussian.name) /** * Sets the value of param [[link]]. @@ -189,6 +184,14 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val setDefault(solver -> "irls") override protected def train(dataset: DataFrame): GeneralizedLinearRegressionModel = { + val familyObj = Family.fromName($(family)) + val linkObj = if (isDefined(link)) { + Link.fromName($(link)) + } else { + familyObj.defaultLink + } + val familyAndLink = new FamilyAndLink(familyObj, linkObj) + val numFeatures = dataset.select(col($(featuresCol))).limit(1) .map { case Row(features: Vector) => features.size @@ -205,8 +208,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val Instance(label, weight, features) } - if (familyObj == GeneralizedLinearRegression.Gaussian && - linkObj == GeneralizedLinearRegression.Identity) { + if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), standardizeFeatures = true, standardizeLabel = true) @@ -237,7 +239,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val private[ml] object GeneralizedLinearRegression { /** Set of family and link pairs that GeneralizedLinearRegression supports. */ - lazy val supportedFamilyAndLinkParis = Set( + lazy val supportedFamilyAndLinkPairs = Set( Gaussian -> Identity, Gaussian -> Log, Gaussian -> Inverse, Binomial -> Logit, Binomial -> Probit, Binomial -> CLogLog, Poisson -> Log, Poisson -> Identity, Poisson -> Sqrt, @@ -245,17 +247,17 @@ private[ml] object GeneralizedLinearRegression { ) /** Set of family names that GeneralizedLinearRegression supports. */ - lazy val supportedFamilyNames = supportedFamilyAndLinkParis.map(_._1.name) + lazy val supportedFamilyNames = supportedFamilyAndLinkPairs.map(_._1.name) /** Set of link names that GeneralizedLinearRegression supports. */ - lazy val supportedLinkNames = supportedFamilyAndLinkParis.map(_._2.name) + lazy val supportedLinkNames = supportedFamilyAndLinkPairs.map(_._2.name) val epsilon: Double = 1E-16 /** * Wrapper of family and link combination used in the model. */ - private[ml] class FamilyAndLink(val family: Family, var link: Link) extends Serializable { + private[ml] class FamilyAndLink(val family: Family, val link: Link) extends Serializable { /** Linear predictor based on given mu. */ def predict(mu: Double): Double = link.link(family.project(mu)) @@ -552,6 +554,16 @@ class GeneralizedLinearRegressionModel private[ml] ( extends RegressionModel[Vector, GeneralizedLinearRegressionModel] with GeneralizedLinearRegressionBase { + import GeneralizedLinearRegression._ + + lazy val familyObj = Family.fromName($(family)) + lazy val linkObj = if (isDefined(link)) { + Link.fromName($(link)) + } else { + familyObj.defaultLink + } + lazy val familyAndLink = new FamilyAndLink(familyObj, linkObj) + override protected def predict(features: Vector): Double = { val eta = BLAS.dot(features, coefficients) + intercept familyAndLink.fitted(eta) From c05a94899c39bfa9ede9071bd6db6a937b33cf83 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 26 Feb 2016 17:45:21 +0800 Subject: [PATCH 14/17] Rename MaxNumFeatures to MAX_NUM_FEATURES --- .../org/apache/spark/ml/optim/WeightedLeastSquares.scala | 6 +++--- .../spark/ml/regression/GeneralizedLinearRegression.scala | 4 ++-- .../org/apache/spark/ml/regression/LinearRegression.scala | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index fdf1998a7a7d9..55b7510656643 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -160,7 +160,7 @@ private[ml] object WeightedLeastSquares { * In order to take the normal equation approach efficiently, [[WeightedLeastSquares]] * only supports the number of features is no more than 4096. */ - val MaxNumFeatures: Int = 4096 + val MAX_NUM_FEATURES: Int = 4096 /** * Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]]. @@ -180,8 +180,8 @@ private[ml] object WeightedLeastSquares { private var aaSum: DenseVector = _ private def init(k: Int): Unit = { - require(k <= MaxNumFeatures, "In order to take the normal equation approach efficiently, " + - s"we set the max number of features to $MaxNumFeatures but got $k.") + require(k <= MAX_NUM_FEATURES, "In order to take the normal equation approach efficiently, " + + s"we set the max number of features to $MAX_NUM_FEATURES but got $k.") this.k = k triK = k * (k + 1) / 2 count = 0L diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index f8e2999c45e75..8451f624d7a60 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -196,9 +196,9 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val .map { case Row(features: Vector) => features.size }.first() - if (numFeatures > WeightedLeastSquares.MaxNumFeatures) { + if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) { val msg = "Currently, GeneralizedLinearRegression only supports number of features" + - s" <= ${WeightedLeastSquares.MaxNumFeatures}. Found $numFeatures in the input dataset." + s" <= ${WeightedLeastSquares.MAX_NUM_FEATURES}. Found $numFeatures in the input dataset." throw new SparkException(msg) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 0dfe733df4692..56dc36e8e026b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -164,7 +164,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) if (($(solver) == "auto" && $(elasticNetParam) == 0.0 && - numFeatures <= WeightedLeastSquares.MaxNumFeatures) || $(solver) == "normal") { + numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == "normal") { require($(elasticNetParam) == 0.0, "Only L2 regularization can be used when normal " + "solver is used.'") // For low dimensional data, WeightedLeastSquares is more efficiently since the From 314b562f315723a7117851289c8f5b6e1b16a6ac Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 1 Mar 2016 11:23:05 +0800 Subject: [PATCH 15/17] Fix test issues --- .../GeneralizedLinearRegressionSuite.scala | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 58713c9b732af..7f3cb3ebbd328 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -52,19 +52,19 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark datasetGaussianIdentity = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gaussian", link = "identity"), 2)) datasetGaussianLog = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gaussian", link = "log"), 2)) datasetGaussianInverse = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gaussian", link = "inverse"), 2)) datasetBinomial = { @@ -74,45 +74,46 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) val testData = - generateMultinomialLogisticInput(coefficients, xMean, xVariance, true, nPoints, seed) + generateMultinomialLogisticInput(coefficients, xMean, xVariance, + addIntercept = true, nPoints, seed) - sqlContext.createDataFrame(sc.parallelize(testData, 4)) + sqlContext.createDataFrame(sc.parallelize(testData, 2)) } datasetPoissonLog = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "poisson", link = "log"), 2)) datasetPoissonIdentity = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "poisson", link = "identity"), 2)) datasetPoissonSqrt = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "poisson", link = "sqrt"), 2)) datasetGammaInverse = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gamma", link = "inverse"), 2)) datasetGammaIdentity = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gamma", link = "identity"), 2)) datasetGammaLog = sqlContext.createDataFrame( sc.parallelize(generateGeneralizedLinearRegressionInput( intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.01, + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, family = "gamma", link = "log"), 2)) } @@ -132,14 +133,12 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark assert(glr.getWeightCol === "") assert(glr.getRegParam === 0.0) assert(glr.getSolver == "irls") + // TODO: Construct model directly instead of via fitting. val model = glr.setFamily("gaussian").setLink("identity") .fit(datasetGaussianIdentity) // copied model must have the same parent. MLTestingUtils.checkCopy(model) - model.transform(datasetGaussianIdentity) - .select("label", "prediction") - .collect() assert(model.getFeaturesCol === "features") assert(model.getPredictionCol === "prediction") @@ -467,7 +466,7 @@ object GeneralizedLinearRegressionSuite { xVariance: Array[Double], nPoints: Int, seed: Int, - eps: Double, + noiseLevel: Double, family: String, link: String): Seq[LabeledPoint] = { @@ -491,7 +490,7 @@ object GeneralizedLinearRegressionSuite { case "sqrt" => math.pow(eta, 2.0) case "inverse" => 1.0 / eta } - val label = mu + eps * (generator.nextValue() - mean) + val label = mu + noiseLevel * (generator.nextValue() - mean) // Return LabeledPoints with DenseVector LabeledPoint(label, features) } From 31a912cd74cf3dffbf8cc0af8c57b777d49579eb Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 1 Mar 2016 12:26:15 +0800 Subject: [PATCH 16/17] Better error message --- .../GeneralizedLinearRegressionSuite.scala | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 7f3cb3ebbd328..8bfa9855ce4ea 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -200,14 +200,16 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark .setFitIntercept(fitIntercept) val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + assert(actual ~= expected(idx) absTol 1e-4, "Model mismatch: GLM with gaussian family, " + + s"$link link and fitIntercept = $fitIntercept.") val familyLink = new FamilyAndLink(Gaussian, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept val prediction2 = familyLink.fitted(eta) - assert(prediction1 ~== prediction2 relTol 1E-5) + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"gaussian family, $link link and fitIntercept = $fitIntercept.") } idx += 1 @@ -253,7 +255,8 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark .setFitIntercept(fitIntercept).setRegParam(regParam) val model = trainer.fit(datasetGaussianIdentity) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + assert(actual ~= expected(idx) absTol 1e-4, "Model mismatch: GLM with gaussian family, " + + s"fitIntercept = $fitIntercept and regParam = $regParam.") idx += 1 } @@ -309,14 +312,16 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1), model.coefficients(2), model.coefficients(3)) - assert(actual ~== expected(idx) absTol 1e-4) + assert(actual ~= expected(idx) absTol 1e-4, "Model mismatch: GLM with binomial family, " + + s"$link link and fitIntercept = $fitIntercept.") val familyLink = new FamilyAndLink(Binomial, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept val prediction2 = familyLink.fitted(eta) - assert(prediction1 ~== prediction2 relTol 1E-5) + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"binomial family, $link link and fitIntercept = $fitIntercept.") } idx += 1 @@ -375,14 +380,16 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark .setFitIntercept(fitIntercept) val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + assert(actual ~= expected(idx) absTol 1e-4, "Model mismatch: GLM with poisson family, " + + s"$link link and fitIntercept = $fitIntercept.") val familyLink = new FamilyAndLink(Poisson, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept val prediction2 = familyLink.fitted(eta) - assert(prediction1 ~== prediction2 relTol 1E-5) + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"poisson family, $link link and fitIntercept = $fitIntercept.") } idx += 1 @@ -441,14 +448,16 @@ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSpark .setFitIntercept(fitIntercept) val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-4) + assert(actual ~= expected(idx) absTol 1e-4, "Model mismatch: GLM with gamma family, " + + s"$link link and fitIntercept = $fitIntercept.") val familyLink = new FamilyAndLink(Gamma, Link.fromName(link)) model.transform(dataset).select("features", "prediction").collect().foreach { case Row(features: DenseVector, prediction1: Double) => val eta = BLAS.dot(features, model.coefficients) + model.intercept val prediction2 = familyLink.fitted(eta) - assert(prediction1 ~== prediction2 relTol 1E-5) + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"gamma family, $link link and fitIntercept = $fitIntercept.") } idx += 1 From 007a4ec324db273c048ed65fe8942daba0c9d844 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 1 Mar 2016 15:23:43 +0800 Subject: [PATCH 17/17] Use .rdd.map instead of .map --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 8451f624d7a60..a850dfee0a452 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -192,7 +192,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } val familyAndLink = new FamilyAndLink(familyObj, linkObj) - val numFeatures = dataset.select(col($(featuresCol))).limit(1) + val numFeatures = dataset.select(col($(featuresCol))).limit(1).rdd .map { case Row(features: Vector) => features.size }.first() @@ -203,7 +203,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))) + val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd .map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) }