Skip to content

Commit 769a909

Browse files
thunterdbmengxr
authored andcommitted
[SPARK-7264][ML] Parallel lapply for sparkR
## What changes were proposed in this pull request? This PR adds a new function in SparkR called `sparkLapply(list, function)`. This function implements a distributed version of `lapply` using Spark as a backend. TODO: - [x] check documentation - [ ] check tests Trivial example in SparkR: ```R sparkLapply(1:5, function(x) { 2 * x }) ``` Output: ``` [[1]] [1] 2 [[2]] [1] 4 [[3]] [1] 6 [[4]] [1] 8 [[5]] [1] 10 ``` Here is a slightly more complex example to perform distributed training of multiple models. Under the hood, Spark broadcasts the dataset. ```R library("MASS") data(menarche) families <- c("gaussian", "poisson") train <- function(family){glm(Menarche ~ Age , family=family, data=menarche)} results <- sparkLapply(families, train) ``` ## How was this patch tested? This PR was tested in SparkR. I am unfamiliar with R and SparkR, so any feedback on style, testing, etc. will be much appreciated. cc falaki davies Author: Timothy Hunter <[email protected]> Closes #12426 from thunterdb/7264.
1 parent 4607f6e commit 769a909

File tree

3 files changed

+49
-0
lines changed

3 files changed

+49
-0
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ export("as.DataFrame",
295295
"read.json",
296296
"read.parquet",
297297
"read.text",
298+
"spark.lapply",
298299
"sql",
299300
"str",
300301
"tableToDF",

R/pkg/R/context.R

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) {
226226
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
227227
}
228228

229+
#' @title Run a function over a list of elements, distributing the computations with Spark.
230+
#'
231+
#' @description
232+
#' Applies a function in a manner that is similar to doParallel or lapply to elements of a list.
233+
#' The computations are distributed using Spark. It is conceptually the same as the following code:
234+
#' lapply(list, func)
235+
#'
236+
#' Known limitations:
237+
#' - variable scoping and capture: compared to R's rich support for variable resolutions, the
238+
# distributed nature of SparkR limits how variables are resolved at runtime. All the variables
239+
# that are available through lexical scoping are embedded in the closure of the function and
240+
# available as read-only variables within the function. The environment variables should be
241+
# stored into temporary variables outside the function, and not directly accessed within the
242+
# function.
243+
#'
244+
#' - loading external packages: In order to use a package, you need to load it inside the
245+
#' closure. For example, if you rely on the MASS module, here is how you would use it:
246+
#'\dontrun{
247+
#' train <- function(hyperparam) {
248+
#' library(MASS)
249+
#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
250+
#' model
251+
#' }
252+
#'}
253+
#'
254+
#' @rdname spark.lapply
255+
#' @param sc Spark Context to use
256+
#' @param list the list of elements
257+
#' @param func a function that takes one argument.
258+
#' @return a list of results (the exact type being determined by the function)
259+
#' @export
260+
#' @examples
261+
#'\dontrun{
262+
#' doubled <- spark.lapply(1:10, function(x){2 * x})
263+
#'}
264+
spark.lapply <- function(sc, list, func) {
265+
rdd <- parallelize(sc, list, length(list))
266+
results <- map(rdd, func)
267+
local <- collect(results)
268+
local
269+
}
270+
229271
#' Set new log level
230272
#'
231273
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
141141
expect_that(processSparkJars(f), not(gives_warning()))
142142
expect_match(processSparkJars(f), f)
143143
})
144+
145+
test_that("spark.lapply should perform simple transforms", {
146+
sc <- sparkR.init()
147+
doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x })
148+
expect_equal(doubled, as.list(2 * 1:10))
149+
})

0 commit comments

Comments
 (0)