Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ setCheckpointDir <- function(sc, dirName) {
#' \preformatted{
#' train <- function(hyperparam) {
#' library(MASS)
#' lm.ridge(y ~ x+z, data, lambda=hyperparam)
#' lm.ridge("y ~ x+z", data, lambda=hyperparam)
#' model
#' }
#' }
Expand Down
77 changes: 77 additions & 0 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,83 @@ head(df)
{% endhighlight %}
</div>

### Applying User-Defined Function
In SparkR, we support several kinds of User-Defined Functions:

#### Run a given function on a large dataset using `dapply` or `dapplyCollect`

##### dapply
Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function
should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output.
<div data-lang="r" markdown="1">
{% highlight r %}

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
{% endhighlight %}
</div>

##### dapplyCollect
Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the
output of UDF run on all the partitions can fit in driver memory.
<div data-lang="r" markdown="1">
{% highlight r %}

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440

{% endhighlight %}
</div>

#### Run local R functions distributed using `spark.lapply`

##### spark.lapply
Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark.
Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations
should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use
`dapply`
<div data-lang="r" markdown="1">
{% highlight r %}

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

{% endhighlight %}
</div>

## Running SQL Queries from SparkR
A SparkR DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data.
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
Expand Down