Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ export("print.jobj")

# MLlib integration
exportMethods("glm",
"spark.glm",
"predict",
"summary",
"kmeans",
"spark.kmeans",
"fitted",
"naiveBayes",
"survreg")
"spark.naiveBayes",
"spark.survreg")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
16 changes: 10 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,10 @@ setGeneric("window", function(x, ...) { standardGeneric("window") })
#' @export
setGeneric("year", function(x) { standardGeneric("year") })

#' @rdname spark.glm
#' @export
setGeneric("spark.glm", function(data, formula, ...) { standardGeneric("spark.glm") })

#' @rdname glm
#' @export
setGeneric("glm")
Expand All @@ -1189,21 +1193,21 @@ setGeneric("predict", function(object, ...) { standardGeneric("predict") })
#' @export
setGeneric("rbind", signature = "...")

#' @rdname kmeans
#' @rdname spark.kmeans
#' @export
setGeneric("kmeans")
setGeneric("spark.kmeans", function(data, k, ...) { standardGeneric("spark.kmeans") })

#' @rdname fitted
#' @export
setGeneric("fitted")

#' @rdname naiveBayes
#' @rdname spark.naiveBayes
#' @export
setGeneric("naiveBayes", function(formula, data, ...) { standardGeneric("naiveBayes") })
setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("spark.naiveBayes") })

#' @rdname survreg
#' @rdname spark.survreg
#' @export
setGeneric("survreg", function(formula, data, ...) { standardGeneric("survreg") })
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })

#' @rdname ml.save
#' @export
Expand Down
155 changes: 97 additions & 58 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

# mllib.R: Provides methods for MLlib integration

# Integration with R's standard functions.
# Most of MLlib's argorithms are provided in two flavours:
# - a specialization of the default R methods (glm). These methods try to respect
# the inputs and the outputs of R's method to the largest extent, but some small differences
# may exist.
# - a set of methods that reflect the arguments of the other languages supported by Spark. These
# methods are prefixed with the `spark.` prefix: spark.glm, spark.kmeans, etc.

#' @title S4 class that represents a generalized linear model
#' @param jobj a Java object reference to the backing Scala GeneralizedLinearRegressionWrapper
#' @export
Expand All @@ -39,6 +47,54 @@ setClass("KMeansModel", representation(jobj = "jobj"))

#' Fits a generalized linear model
#'
#' Fits a generalized linear model against a Spark DataFrame.
#'
#' @param data SparkDataFrame for training.
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param family A description of the error distribution and link function to be used in the model.
#' This can be a character string naming a family function, a family function or
#' the result of a call to a family function. Refer R family at
#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
#' @param epsilon Positive convergence tolerance of iterations.
#' @param maxit Integer giving the maximal number of IRLS iterations.
#' @return a fitted generalized linear model
#' @rdname spark.glm
#' @export
#' @examples
#' \dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' data(iris)
#' df <- createDataFrame(sqlContext, iris)
#' model <- spark.glm(df, Sepal_Length ~ Sepal_Width, family="gaussian")
#' summary(model)
#' }
setMethod(
"spark.glm",
signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, family = gaussian, epsilon = 1e-06, maxit = 25) {
if (is.character(family)) {
family <- get(family, mode = "function", envir = parent.frame())
}
if (is.function(family)) {
family <- family()
}
if (is.null(family$family)) {
print(family)
stop("'family' not recognized")
}

formula <- paste(deparse(formula), collapse = "")

jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, family$family, family$link,
epsilon, as.integer(maxit))
return(new("GeneralizedLinearRegressionModel", jobj = jobj))
})

#' Fits a generalized linear model (R-compliant).
#'
#' Fits a generalized linear model, similarly to R's glm().
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
Expand All @@ -64,23 +120,7 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' }
setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDataFrame"),
function(formula, family = gaussian, data, epsilon = 1e-06, maxit = 25) {
if (is.character(family)) {
family <- get(family, mode = "function", envir = parent.frame())
}
if (is.function(family)) {
family <- family()
}
if (is.null(family$family)) {
print(family)
stop("'family' not recognized")
}

formula <- paste(deparse(formula), collapse = "")

jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, family$family, family$link,
epsilon, as.integer(maxit))
return(new("GeneralizedLinearRegressionModel", jobj = jobj))
spark.glm(data, formula, family, epsilon, maxit)
})

#' Get the summary of a generalized linear model
Expand Down Expand Up @@ -188,7 +228,7 @@ setMethod("predict", signature(object = "GeneralizedLinearRegressionModel"),
#' @export
#' @examples
#' \dontrun{
#' model <- naiveBayes(y ~ x, trainingData)
#' model <- spark.naiveBayes(trainingData, y ~ x)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#'}
Expand All @@ -208,7 +248,7 @@ setMethod("predict", signature(object = "NaiveBayesModel"),
#' @export
#' @examples
#' \dontrun{
#' model <- naiveBayes(y ~ x, trainingData)
#' model <- spark.naiveBayes(trainingData, y ~ x)
#' summary(model)
#'}
setMethod("summary", signature(object = "NaiveBayesModel"),
Expand All @@ -230,23 +270,23 @@ setMethod("summary", signature(object = "NaiveBayesModel"),
#'
#' Fit a k-means model, similarly to R's kmeans().
#'
#' @param x SparkDataFrame for training
#' @param centers Number of centers
#' @param iter.max Maximum iteration number
#' @param algorithm Algorithm choosen to fit the model
#' @param data SparkDataFrame for training
#' @param k Number of centers
#' @param maxIter Maximum iteration number
#' @param initializationMode Algorithm choosen to fit the model
#' @return A fitted k-means model
#' @rdname kmeans
#' @rdname spark.kmeans
#' @export
#' @examples
#' \dontrun{
#' model <- kmeans(x, centers = 2, algorithm="random")
#' model <- spark.kmeans(data, k = 2, initializationMode="random")
#' }
setMethod("kmeans", signature(x = "SparkDataFrame"),
function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) {
columnNames <- as.array(colnames(x))
algorithm <- match.arg(algorithm)
jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", x@sdf,
centers, iter.max, algorithm, columnNames)
setMethod("spark.kmeans", signature(data = "SparkDataFrame"),
function(data, k, maxIter = 10, initializationMode = c("random", "k-means||")) {
columnNames <- as.array(colnames(data))
initializationMode <- match.arg(initializationMode)
jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf,
k, maxIter, initializationMode, columnNames)
return(new("KMeansModel", jobj = jobj))
})

Expand All @@ -261,7 +301,7 @@ setMethod("kmeans", signature(x = "SparkDataFrame"),
#' @export
#' @examples
#' \dontrun{
#' model <- kmeans(trainingData, 2)
#' model <- spark.kmeans(trainingData, 2)
#' fitted.model <- fitted(model)
#' showDF(fitted.model)
#'}
Expand All @@ -288,7 +328,7 @@ setMethod("fitted", signature(object = "KMeansModel"),
#' @export
#' @examples
#' \dontrun{
#' model <- kmeans(trainingData, 2)
#' model <- spark.kmeans(trainingData, 2)
#' summary(model)
#' }
setMethod("summary", signature(object = "KMeansModel"),
Expand Down Expand Up @@ -322,7 +362,7 @@ setMethod("summary", signature(object = "KMeansModel"),
#' @export
#' @examples
#' \dontrun{
#' model <- kmeans(trainingData, 2)
#' model <- spark.kmeans(trainingData, 2)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#' }
Expand All @@ -333,30 +373,28 @@ setMethod("predict", signature(object = "KMeansModel"),

#' Fit a Bernoulli naive Bayes model
#'
#' Fit a Bernoulli naive Bayes model, similarly to R package e1071's naiveBayes() while only
#' categorical features are supported. The input should be a SparkDataFrame of observations instead
#' of a contingency table.
#' Fit a Bernoulli naive Bayes model on a Spark DataFrame (only categorical data is supported).
#'
#' @param data SparkDataFrame for training
#' @param object A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param data SparkDataFrame for training
#' @param laplace Smoothing parameter
#' @return a fitted naive Bayes model
#' @rdname naiveBayes
#' @rdname spark.naiveBayes
#' @seealso e1071: \url{https://cran.r-project.org/web/packages/e1071/}
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(sqlContext, infert)
#' model <- naiveBayes(education ~ ., df, laplace = 0)
#' model <- spark.naiveBayes(df, education ~ ., laplace = 0)
#'}
setMethod("naiveBayes", signature(formula = "formula", data = "SparkDataFrame"),
function(formula, data, laplace = 0, ...) {
formula <- paste(deparse(formula), collapse = "")
jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit",
formula, data@sdf, laplace)
return(new("NaiveBayesModel", jobj = jobj))
})
setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, laplace = 0, ...) {
formula <- paste(deparse(formula), collapse = "")
jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit",
formula, data@sdf, laplace)
return(new("NaiveBayesModel", jobj = jobj))
})

#' Save the Bernoulli naive Bayes model to the input path.
#'
Expand All @@ -371,7 +409,7 @@ setMethod("naiveBayes", signature(formula = "formula", data = "SparkDataFrame"),
#' @examples
#' \dontrun{
#' df <- createDataFrame(sqlContext, infert)
#' model <- naiveBayes(education ~ ., df, laplace = 0)
#' model <- spark.naiveBayes(education ~ ., df, laplace = 0)
#' path <- "path/to/model"
#' ml.save(model, path)
#' }
Expand All @@ -396,7 +434,7 @@ setMethod("ml.save", signature(object = "NaiveBayesModel", path = "character"),
#' @export
#' @examples
#' \dontrun{
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, trainingData)
#' model <- spark.survreg(trainingData, Surv(futime, fustat) ~ ecog_ps + rx)
#' path <- "path/to/model"
#' ml.save(model, path)
#' }
Expand Down Expand Up @@ -446,7 +484,7 @@ setMethod("ml.save", signature(object = "GeneralizedLinearRegressionModel", path
#' @export
#' @examples
#' \dontrun{
#' model <- kmeans(x, centers = 2, algorithm="random")
#' model <- spark.kmeans(x, k = 2, initializationMode="random")
#' path <- "path/to/model"
#' ml.save(model, path)
#' }
Expand Down Expand Up @@ -489,29 +527,30 @@ ml.load <- function(path) {

#' Fit an accelerated failure time (AFT) survival regression model.
#'
#' Fit an accelerated failure time (AFT) survival regression model, similarly to R's survreg().
#' Fit an accelerated failure time (AFT) survival regression model on a Spark DataFrame.
#'
#' @param data SparkDataFrame for training.
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', ':', '+', and '-'.
#' Note that operator '.' is not supported currently.
#' @param data SparkDataFrame for training.
#' @return a fitted AFT survival regression model
#' @rdname survreg
#' @rdname spark.survreg
#' @seealso survival: \url{https://cran.r-project.org/web/packages/survival/}
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(sqlContext, ovarian)
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, df)
#' model <- spark.survreg(Surv(df, futime, fustat) ~ ecog_ps + rx)
#' }
setMethod("survreg", signature(formula = "formula", data = "SparkDataFrame"),
function(formula, data, ...) {
setMethod("spark.survreg", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, ...) {
formula <- paste(deparse(formula), collapse = "")
jobj <- callJStatic("org.apache.spark.ml.r.AFTSurvivalRegressionWrapper",
"fit", formula, data@sdf)
return(new("AFTSurvivalRegressionModel", jobj = jobj))
})


#' Get the summary of an AFT survival regression model
#'
#' Returns the summary of an AFT survival regression model produced by survreg(),
Expand All @@ -523,7 +562,7 @@ setMethod("survreg", signature(formula = "formula", data = "SparkDataFrame"),
#' @export
#' @examples
#' \dontrun{
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, trainingData)
#' model <- spark.survreg(trainingData, Surv(futime, fustat) ~ ecog_ps + rx)
#' summary(model)
#' }
setMethod("summary", signature(object = "AFTSurvivalRegressionModel"),
Expand All @@ -548,7 +587,7 @@ setMethod("summary", signature(object = "AFTSurvivalRegressionModel"),
#' @export
#' @examples
#' \dontrun{
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, trainingData)
#' model <- spark.survreg(trainingData, Surv(futime, fustat) ~ ecog_ps + rx)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#' }
Expand Down
Loading