Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 61 additions & 78 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

# mllib.R: Provides methods for MLlib integration

#' @title S4 class that represents a PipelineModel
#' @param model A Java object reference to the backing Scala PipelineModel
#' @title S4 class that represents a generalized linear model
#' @param jobj a Java object reference to the backing Scala GeneralizedLinearRegressionWrapper
#' @export
setClass("PipelineModel", representation(model = "jobj"))
setClass("GeneralizedLinearRegressionModel", representation(jobj = "jobj"))

#' @title S4 class that represents a NaiveBayesModel
#' @param jobj a Java object reference to the backing Scala NaiveBayesWrapper
Expand All @@ -39,21 +39,18 @@ setClass("KMeansModel", representation(jobj = "jobj"))

#' Fits a generalized linear model
#'
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
#' Fits a generalized linear model, similarly to R's glm().
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param data DataFrame for training
#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
#' @param lambda Regularization parameter
#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details)
#' @param standardize Whether to standardize features before training
#' @param solver The solver algorithm used for optimization, this can be "l-bfgs", "normal" and
#' "auto". "l-bfgs" denotes Limited-memory BFGS which is a limited-memory
#' quasi-Newton optimization method. "normal" denotes using Normal Equation as an
#' analytical solution to the linear regression problem. The default value is "auto"
#' which means that the solver algorithm is selected automatically.
#' @return a fitted MLlib model
#' @param data DataFrame for training.
#' @param family A description of the error distribution and link function to be used in the model.
#' This can be a character string naming a family function, a family function or
#' the result of a call to a family function. Refer R family at
#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
#' @param epsilon Positive convergence tolerance of iterations.
#' @param maxit Integer giving the maximal number of IRLS iterations.
#' @return a fitted generalized linear model
#' @rdname glm
#' @export
#' @examples
Expand All @@ -64,36 +61,70 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' df <- createDataFrame(sqlContext, iris)
#' model <- glm(Sepal_Length ~ Sepal_Width, df, family="gaussian")
#' summary(model)
#'}
#' }
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
standardize = TRUE, solver = "auto") {
family <- match.arg(family)
function(formula, family = gaussian, data, epsilon = 1e-06, maxit = 25) {
if (is.character(family)) {
family <- get(family, mode = "function", envir = parent.frame())
}
if (is.function(family)) {
family <- family()
}
if (is.null(family$family)) {
print(family)
stop("'family' not recognized")
}

formula <- paste(deparse(formula), collapse = "")
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", formula, data@sdf, family, lambda,
alpha, standardize, solver)
return(new("PipelineModel", model = model))

jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, family$family, family$link,
epsilon, as.integer(maxit))
return(new("GeneralizedLinearRegressionModel", jobj = jobj))
})

#' Make predictions from a model
#' Get the summary of a generalized linear model
#'
#' Makes predictions from a model produced by glm(), similarly to R's predict().
#' Returns the summary of a model produced by glm(), similarly to R's summary().
#'
#' @param object A fitted MLlib model
#' @param object A fitted generalized linear model
#' @return coefficients the model's coefficients, intercept
#' @rdname summary
#' @export
#' @examples
#' \dontrun{
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#' }
setMethod("summary", signature(object = "GeneralizedLinearRegressionModel"),
function(object, ...) {
jobj <- object@jobj
features <- callJMethod(jobj, "rFeatures")
coefficients <- callJMethod(jobj, "rCoefficients")
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
})

#' Make predictions from a generalized linear model
#'
#' Makes predictions from a generalized linear model produced by glm(), similarly to R's predict().
#'
#' @param object A fitted generalized linear model
#' @param newData DataFrame for testing
#' @return DataFrame containing predicted values
#' @return DataFrame containing predicted labels in a column named "prediction"
#' @rdname predict
#' @export
#' @examples
#' \dontrun{
#' model <- glm(y ~ x, trainingData)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#'}
setMethod("predict", signature(object = "PipelineModel"),
#' }
setMethod("predict", signature(object = "GeneralizedLinearRegressionModel"),
function(object, newData) {
return(dataFrame(callJMethod(object@model, "transform", newData@sdf)))
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
})

#' Make predictions from a naive Bayes model
Expand All @@ -116,54 +147,6 @@ setMethod("predict", signature(object = "NaiveBayesModel"),
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
})

#' Get the summary of a model
#'
#' Returns the summary of a model produced by glm(), similarly to R's summary().
#'
#' @param object A fitted MLlib model
#' @return a list with 'devianceResiduals' and 'coefficients' components for gaussian family
#' or a list with 'coefficients' component for binomial family. \cr
#' For gaussian family: the 'devianceResiduals' gives the min/max deviance residuals
#' of the estimation, the 'coefficients' gives the estimated coefficients and their
#' estimated standard errors, t values and p-values. (It only available when model
#' fitted by normal solver.) \cr
#' For binomial family: the 'coefficients' gives the estimated coefficients.
#' See summary.glm for more information. \cr
#' @rdname summary
#' @export
#' @examples
#' \dontrun{
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#'}
setMethod("summary", signature(object = "PipelineModel"),
function(object, ...) {
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelName", object@model)
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelFeatures", object@model)
coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelCoefficients", object@model)
if (modelName == "LinearRegressionModel") {
devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelDevianceResiduals", object@model)
devianceResiduals <- matrix(devianceResiduals, nrow = 1)
colnames(devianceResiduals) <- c("Min", "Max")
rownames(devianceResiduals) <- rep("", times = 1)
coefficients <- matrix(coefficients, ncol = 4)
colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
rownames(coefficients) <- unlist(features)
return(list(devianceResiduals = devianceResiduals, coefficients = coefficients))
} else if (modelName == "LogisticRegressionModel") {
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
} else {
stop(paste("Unsupported model", modelName, sep = " "))
}
})

#' Get the summary of a naive Bayes model
#'
#' Returns the summary of a naive Bayes model produced by naiveBayes(), similarly to R's summary().
Expand Down
95 changes: 29 additions & 66 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ sc <- sparkR.init()

sqlContext <- sparkRSQL.init(sc)

test_that("glm and predict", {
test_that("formula of glm", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
test <- select(training, "Sepal_Length")
model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
prediction <- predict(model, test)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
# dot minus and intercept vs native glm
model <- glm(Sepal_Width ~ . - Species + 0, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# Test stats::predict is working
x <- rnorm(15)
y <- x + rnorm(15)
expect_equal(length(predict(lm(y ~ x))), 15)
})
# feature interaction vs native glm
model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

test_that("glm should work with long formula", {
# glm should work with long formula
training <- suppressWarnings(createDataFrame(sqlContext, iris))
training$LongLongLongLongLongName <- training$Sepal_Width
training$VeryLongLongLongLonLongName <- training$Sepal_Length
Expand All @@ -50,68 +51,30 @@ test_that("glm should work with long formula", {
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("predictions match with native glm", {
test_that("glm and predict", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
# gaussian family
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
vals <- collect(select(predict(model, training), "prediction"))
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("dot minus and intercept vs native glm", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
model <- glm(Sepal_Width ~ . - Species + 0, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("feature interaction vs native glm", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
# poisson family
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
family = poisson(link = identity))
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
data = iris, family = poisson(link = identity)), iris))
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("summary coefficients match with native glm", {
training <- suppressWarnings(createDataFrame(sqlContext, iris))
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
coefs <- unlist(stats$coefficients)
devianceResiduals <- unlist(stats$devianceResiduals)

rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
rCoefs <- unlist(rStats$coefficients)
rDevianceResiduals <- c(-0.95096, 0.72918)

expect_true(all(abs(rCoefs - coefs) < 1e-5))
expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5))
expect_true(all(
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
})

test_that("summary coefficients match with native glm of family 'binomial'", {
df <- suppressWarnings(createDataFrame(sqlContext, iris))
training <- filter(df, df$Species != "setosa")
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
family = "binomial"))
coefs <- as.vector(stats$coefficients[, 1])

rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
family = binomial(link = "logit"))))

expect_true(all(abs(rCoefs - coefs) < 1e-4))
expect_true(all(
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Sepal_Width")))
})

test_that("summary works on base GLM models", {
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
baseSummary <- summary(baseModel)
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
# Test stats::predict is working
x <- rnorm(15)
y <- x + rnorm(15)
expect_equal(length(predict(lm(y ~ x))), 15)
})

test_that("kmeans", {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.r

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.regression._
import org.apache.spark.sql._

private[r] class GeneralizedLinearRegressionWrapper private (
pipeline: PipelineModel,
val features: Array[String]) {

private val glm: GeneralizedLinearRegressionModel =
pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]

lazy val rCoefficients: Array[Double] = if (glm.getFitIntercept) {
Array(glm.intercept) ++ glm.coefficients.toArray
} else {
glm.coefficients.toArray
}

lazy val rFeatures: Array[String] = if (glm.getFitIntercept) {
Array("(Intercept)") ++ features
} else {
features
}

def transform(dataset: DataFrame): DataFrame = {
pipeline.transform(dataset).drop(glm.getFeaturesCol)
}
}

private[r] object GeneralizedLinearRegressionWrapper {

def fit(
formula: String,
data: DataFrame,
family: String,
link: String,
epsilon: Double,
maxit: Int): GeneralizedLinearRegressionWrapper = {
val rFormula = new RFormula()
.setFormula(formula)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
val schema = rFormulaModel.transform(data).schema
val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol))
.attributes.get
val features = featureAttrs.map(_.name.get)
// assemble and fit the pipeline
val glm = new GeneralizedLinearRegression()
.setFamily(family)
.setLink(link)
.setFitIntercept(rFormula.hasIntercept)
.setTol(epsilon)
.setMaxIter(maxit)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glm))
.fit(data)
new GeneralizedLinearRegressionWrapper(pipeline, features)
}
}
Loading