Skip to content

Commit 5c49428

Browse files
committed
SparkR support add files to Spark job and get by executors.
1 parent 6a6adb1 commit 5c49428

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

R/pkg/R/context.R

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,37 @@ setCheckpointDir <- function(sc, dirName) {
225225
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
226226
}
227227

228+
#' Add a file or directory to be downloaded with this Spark job on every node.
229+
#'
230+
#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
231+
#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
232+
#' use sparkFiles.get(fileName) to find its download location.
233+
#'
234+
#' A directory can be given if the recursive option is set to true.
235+
#' Currently directories are only supported for Hadoop-supported filesystems.
236+
#'
237+
#' @param path The path of the files to be added
238+
#' @param recursive Recursive or not if the path is directory. Default is FALSE.
239+
#' @noRd
240+
#' @examples
241+
#'\dontrun{
242+
#' sc <- sparkR.init()
243+
#' addFile(sc, "myfile")
244+
#'}
245+
addFile <- function(sc, path) {
246+
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
247+
}
248+
249+
#' Get the root directory that contains files added through addFile.
250+
sparkFiles.getRootDirectory <- function() {
251+
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
252+
}
253+
254+
#' Get the absolute path of a file added through addFile.
255+
sparkFiles.get <- function(fileName) {
256+
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
257+
}
258+
228259
#' Run a function over a list of elements, distributing the computations with Spark
229260
#'
230261
#' Run a function over a list of elements, distributing the computations with Spark. Applies a

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,4 +801,15 @@ test_that("Test correct concurrency of RRDD.compute()", {
801801
expect_equal(count, 1000)
802802
})
803803

804+
test_that("add and get file to be downloaded with Spark job on every node", {
805+
path <- tempfile(pattern = "hello", fileext = ".txt")
806+
filename <- basename(path)
807+
words <- "Hello World!"
808+
writeLines(words, path)
809+
addFile(sc, path)
810+
download_path <- sparkFiles.get(filename)
811+
expect_equal(readLines(download_path), words)
812+
unlink(path)
813+
})
814+
804815
sparkR.session.stop()

0 commit comments

Comments
 (0)