Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Depends:
R (>= 3.0),
methods,
Suggests:
testthat
testthat,
e1071
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ exportMethods("glm",
"predict",
"summary",
"kmeans",
"fitted")
"fitted",
"naiveBayes")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1175,3 +1175,7 @@ setGeneric("kmeans")
#' @rdname fitted
#' @export
setGeneric("fitted")

#' @rdname naiveBayes
#' @export
setGeneric("naiveBayes", function(formula, data, ...) { standardGeneric("naiveBayes") })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you check manually that the http://ugrad.stat.ubc.ca/R/library/e1071/html/naiveBayes.html
e1071 package naiveBayes still work when SparkR is loaded after e1071?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users need e1071::naiveBayes to call that. Both naiveBayes are implemented as S3 generic functions and the Spark one shares the same first argument type (formula) with the one in e1071. So I don't think we can avoid shadowing the method. I tried different loading orders and confirmed that both can be used with namespace prefixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have a test to ensure that we can use the prefix to call e1071::naiveBayes().

55 changes: 50 additions & 5 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ setClass("PipelineModel", representation(model = "jobj"))
#' @rdname glm
#' @export
#' @examples
#'\dontrun{
#' \dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' data(iris)
Expand Down Expand Up @@ -71,7 +71,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram
#' @rdname predict
#' @export
#' @examples
#'\dontrun{
#' \dontrun{
#' model <- glm(y ~ x, trainingData)
#' predicted <- predict(model, testData)
#' showDF(predicted)
Expand All @@ -97,7 +97,7 @@ setMethod("predict", signature(object = "PipelineModel"),
#' @rdname summary
#' @export
#' @examples
#'\dontrun{
#' \dontrun{
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#'}
Expand Down Expand Up @@ -135,6 +135,19 @@ setMethod("summary", signature(object = "PipelineModel"),
colnames(coefficients) <- unlist(features)
rownames(coefficients) <- 1:k
return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster)))
} else if (modelName == "NaiveBayesModel") {
labels <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getNaiveBayesLabels", object@model)
pi <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getNaiveBayesPi", object@model)
theta <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getNaiveBayesTheta", object@model)
pi <- t(as.matrix(unlist(pi)))
colnames(pi) <- unlist(labels)
theta <- matrix(theta, nrow = length(labels))
rownames(theta) <- unlist(labels)
colnames(theta) <- unlist(features)
return(list(pi = pi, theta = theta))
} else {
stop(paste("Unsupported model", modelName, sep = " "))
}
Expand All @@ -152,7 +165,7 @@ setMethod("summary", signature(object = "PipelineModel"),
#' @rdname kmeans
#' @export
#' @examples
#'\dontrun{
#' \dontrun{
#' model <- kmeans(x, centers = 2, algorithm="random")
#'}
setMethod("kmeans", signature(x = "DataFrame"),
Expand All @@ -173,7 +186,7 @@ setMethod("kmeans", signature(x = "DataFrame"),
#' @rdname fitted
#' @export
#' @examples
#'\dontrun{
#' \dontrun{
#' model <- kmeans(trainingData, 2)
#' fitted.model <- fitted(model)
#' showDF(fitted.model)
Expand All @@ -192,3 +205,35 @@ setMethod("fitted", signature(object = "PipelineModel"),
stop(paste("Unsupported model", modelName, sep = " "))
}
})

#' Fit a naive Bayes model
#'
#' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset'
#' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame,
#' respectively, to preprocess their DataFrame. We use na.omit in this interface to remove rows with
#' NA values.
#'
#' @param object A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param data DataFrame for training
#' @param lambda Smoothing parameter
#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial".
#' @return A fitted naive Bayes model.
#' @rdname naiveBayes
#' @export
#' @examples
#' \dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' df <- createDataFrame(sqlContext, infert)
#' model <- naiveBayes(education ~ ., df, lambda = 1, modelType = "multinomial")
#'}
setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"),
function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) {
data <- na.omit(data)
formula <- paste(deparse(formula), collapse = "")
modelType <- match.arg(modelType)
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes",
formula, data@sdf, lambda, modelType)
return(new("PipelineModel", model = model))
})
24 changes: 24 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,27 @@ test_that("kmeans", {
cluster <- summary.model$cluster
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
})

test_that("naiveBayes", {
training <- suppressWarnings(createDataFrame(sqlContext, infert))

model <- naiveBayes(education ~ ., data = training, lambda = 1, modelType = "multinomial")
sample <- take(select(predict(model, training), "rawLabelsPrediction"), 1)
expect_equal(typeof(sample$rawLabelsPrediction), "character")
expect_equal(sample$rawLabelsPrediction, "0-5yrs")

# Test summary works on naiveBayes
summary.model <- summary(model)
expect_equal(length(summary.model$pi), 3)
expect_equal(sum(summary.model$pi), 1)
l1 <- summary.model$theta[1, ]
l2 <- summary.model$theta[2, ]
expect_equal(sum(unlist(l1)), 1)
expect_equal(sum(unlist(l2)), 1)

# Test e1071::naiveBayes
if (requireNamespace("e1071", quietly = TRUE)) {
expect_that(m <- e1071::naiveBayes(education ~ ., data = infert), not(throws_error()))
expect_equal(as.character(predict(m, infert[1, ])), "0-5yrs")
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes}
Expand Down Expand Up @@ -104,7 +105,12 @@ class NaiveBayes @Since("1.5.0") (
override protected def train(dataset: DataFrame): NaiveBayesModel = {
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType))
NaiveBayesModel.fromOld(oldModel, this)
val nbModel = copyValues(NaiveBayesModel.fromOld(oldModel, this))
val attr = AttributeGroup.fromStructField(dataset.schema($(featuresCol))).attributes
if (attr.isDefined) {
nbModel.setFeatureNames(attr.get.map(_.name.getOrElse("NA")))
}
nbModel
}

@Since("1.5.0")
Expand Down Expand Up @@ -227,6 +233,21 @@ class NaiveBayesModel private[ml] (

@Since("1.6.0")
override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this)

private var featureNames: Option[Array[String]] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mengxr I remove the previous NaiveBayesSummary and add these two featureNames and labelNames because we need these two variables to be accessed from NaiveBayesModel.


private[classification] def setFeatureNames(names: Array[String]): this.type = {
this.featureNames = Some(names)
this
}

private[ml] def getFeatureNames: Array[String] = featureNames match {
case Some(names) => names
case None =>
throw new SparkException(
s"No training result available for the ${this.getClass.getSimpleName}",
new NullPointerException())
}
}

@Since("1.6.0")
Expand All @@ -237,7 +258,6 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] {
oldModel: OldNaiveBayesModel,
parent: NaiveBayes): NaiveBayesModel = {
val uid = if (parent != null) parent.uid else Identifiable.randomUID("nb")
val labels = Vectors.dense(oldModel.labels)
val pi = Vectors.dense(oldModel.pi)
val theta = new DenseMatrix(oldModel.labels.length, oldModel.theta(0).length,
oldModel.theta.flatten, true)
Expand Down
12 changes: 12 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ class RFormulaModel private[feature](
"Label column already exists and is not of type DoubleType.")
}

/**
* Get the original array of labels if exists.
*/
private[ml] def getOriginalLabels: Option[Array[String]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a IndexToString transformer at the end of the PipelineModel? I think it would be more general. Other functions such as glm with "binomial" family should also do the same work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm rewriting it now.

// According to the sequences of transformers in RFormula, if the last stage is a
// StringIndexerModel, then we can extract the original labels from it.
pipelineModel.stages.last match {
case m: StringIndexerModel => Some(m.labels)
case _ => None
}
}

@Since("2.0.0")
override def write: MLWriter = new RFormulaModel.RFormulaModelWriter(this)
}
Expand Down
87 changes: 73 additions & 14 deletions mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.ml.api.r

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.{RFormula, VectorAssembler}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.sql.DataFrame

Expand Down Expand Up @@ -52,6 +52,45 @@ private[r] object SparkRWrappers {
pipeline.fit(df)
}

def fitNaiveBayes(
value: String,
df: DataFrame,
lambda: Double,
modelType: String): PipelineModel = {

// Transform data with RFormula
val formula = new RFormula().setFormula(value)
val fModel = formula.fit(df)
val rawLabels = fModel.getOriginalLabels

val naiveBayes = new NaiveBayes().setSmoothing(lambda).setModelType(modelType)
val rawLabelsIndexer = new IndexToString()
.setInputCol(naiveBayes.getLabelCol).setOutputCol("rawLabelsPrediction")

if (fModel.getOriginalLabels.isDefined) {
// String labels have already been re-indexed by RFormula.
val stages: Array[PipelineStage] =
Array(fModel, naiveBayes, rawLabelsIndexer.setLabels(rawLabels.get))
new Pipeline().setStages(stages).fit(df)
} else {
// Re-index numerical labels for NaiveBayes since it assumes labels are indices.
val labelIndexer = new StringIndexer().setInputCol(fModel.getLabelCol).fit(df)
val stages: Array[PipelineStage] =
Array(
labelIndexer,
fModel,
naiveBayes.setLabelCol(labelIndexer.getOutputCol),
rawLabelsIndexer.setLabels(labelIndexer.labels))
new Pipeline().setStages(stages).fit(df)
}
}

def isNaiveBayesModel(model: PipelineModel): Boolean = {
model.stages.length >= 2 &&
model.stages(model.stages.length - 2).isInstanceOf[NaiveBayesModel] &&
model.stages.last.isInstanceOf[IndexToString]
}

def fitKMeans(
df: DataFrame,
initMode: String,
Expand All @@ -70,7 +109,7 @@ private[r] object SparkRWrappers {

def getModelCoefficients(model: PipelineModel): Array[Double] = {
model.stages.last match {
case m: LinearRegressionModel => {
case m: LinearRegressionModel =>
val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++
m.summary.coefficientStandardErrors.dropRight(1)
val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1)
Expand All @@ -81,16 +120,15 @@ private[r] object SparkRWrappers {
} else {
m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR
}
}
case m: LogisticRegressionModel => {
case m: LogisticRegressionModel =>
if (m.getFitIntercept) {
Array(m.intercept) ++ m.coefficients.toArray
} else {
m.coefficients.toArray
}
}
case m: KMeansModel =>
m.clusterCenters.flatMap(_.toArray)
case _ if isNaiveBayesModel(model) => Array() // A dummy result to prevent unmatched error.
}
}

Expand Down Expand Up @@ -129,7 +167,28 @@ private[r] object SparkRWrappers {
}
}

def getModelFeatures(model: PipelineModel): Array[String] = {
/**
* Extract labels' names for NaiveBayesModel.
*/
def getNaiveBayesLabels(model: PipelineModel): Array[String] = {
assert(isNaiveBayesModel(model),
s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.")
model.stages.last.asInstanceOf[IndexToString].getLabels
}

def getNaiveBayesPi(model: PipelineModel): Array[Double] = {
assert(isNaiveBayesModel(model),
s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.")
model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].pi.toArray.map(math.exp)
}

def getNaiveBayesTheta(model: PipelineModel): Array[Double] = {
assert(isNaiveBayesModel(model),
s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.")
model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].theta.toArray.map(math.exp)
}

def getModelFeatures(model: PipelineModel): Array[String] = {
model.stages.last match {
case m: LinearRegressionModel =>
val attrs = AttributeGroup.fromStructField(
Expand All @@ -151,17 +210,17 @@ private[r] object SparkRWrappers {
val attrs = AttributeGroup.fromStructField(
m.summary.predictions.schema(m.summary.featuresCol))
attrs.attributes.get.map(_.name.get)
case _ if isNaiveBayesModel(model) =>
model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].getFeatureNames
}
}

def getModelName(model: PipelineModel): String = {
model.stages.last match {
case m: LinearRegressionModel =>
"LinearRegressionModel"
case m: LogisticRegressionModel =>
"LogisticRegressionModel"
case m: KMeansModel =>
"KMeansModel"
case m: LinearRegressionModel => "LinearRegressionModel"
case m: LogisticRegressionModel => "LogisticRegressionModel"
case m: KMeansModel => "KMeansModel"
case _ if isNaiveBayesModel(model) => "NaiveBayesModel"
}
}
}