Skip to content

Commit fa82b3a

Browse files
committed
Refactor spark.addFile, spark.getSparkFiles API.
1 parent 542b981 commit fa82b3a

File tree

4 files changed

+41
-27
lines changed

4 files changed

+41
-27
lines changed

R/pkg/NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ export("as.DataFrame",
336336
"read.parquet",
337337
"read.text",
338338
"spark.lapply",
339+
"spark.addFile",
340+
"spark.getSparkFilesRootDirectory",
341+
"spark.getSparkFiles",
339342
"sql",
340343
"str",
341344
"tableToDF",

R/pkg/R/context.R

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -229,35 +229,44 @@ setCheckpointDir <- function(sc, dirName) {
229229
#'
230230
#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
231231
#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
232-
#' use sparkFiles.get(fileName) to find its download location.
233-
#'
234-
#' A directory can be given if the recursive option is set to true.
235-
#' Currently directories are only supported for Hadoop-supported filesystems.
232+
#' use spark.getSparkFiles(fileName) to find its download location.
236233
#'
234+
#' @rdname spark.addFile
237235
#' @param path The path of the file to be added
238-
#' @param recursive Recursive or not if the path is directory. Default is FALSE.
239-
#' @noRd
240236
#' @examples
241237
#'\dontrun{
242-
#' sc <- sparkR.init()
243-
#' addFile(sc, "~/myfile")
238+
#' spark.addFile("~/myfile")
244239
#'}
245-
addFile <- function(sc, path) {
240+
#' @note spark.addFile since 2.1.0
241+
spark.addFile <- function(path) {
242+
sc <- getSparkContext()
246243
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
247244
}
248245

249-
#' Get the root directory that contains files added through addFile.
246+
#' Get the root directory that contains files added through spark.addFile.
250247
#'
251-
#' @noRd
252-
sparkFiles.getRootDirectory <- function() {
248+
#' @rdname spark.getSparkFilesRootDirectory
249+
#' @return the root directory that contains files added through spark.addFile
250+
#' @examples
251+
#'\dontrun{
252+
#' spark.getSparkFilesRootDirectory()
253+
#'}
254+
#' @note spark.getSparkFilesRootDirectory since 2.1.0
255+
spark.getSparkFilesRootDirectory <- function() {
253256
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
254257
}
255258

256-
#' Get the absolute path of a file added through addFile.
259+
#' Get the absolute path of a file added through spark.addFile.
257260
#'
258-
#' @param fileName The name of the file added through addFile
259-
#' @noRd
260-
sparkFiles.get <- function(fileName) {
261+
#' @rdname spark.getSparkFiles
262+
#' @param fileName The name of the file added through spark.addFile
263+
#' @return the absolute path of a file added through spark.addFile.
264+
#' @examples
265+
#'\dontrun{
266+
#' spark.getSparkFiles("myfile")
267+
#'}
268+
#' @note spark.getSparkFiles since 2.1.0
269+
spark.getSparkFiles <- function(fileName) {
261270
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
262271
}
263272

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", {
166166
expect_equal(doubled, as.list(2 * 1:10))
167167
sparkR.session.stop()
168168
})
169+
170+
test_that("add and get file to be downloaded with Spark job on every node", {
171+
sparkR.sparkContext()
172+
path <- tempfile(pattern = "hello", fileext = ".txt")
173+
filename <- basename(path)
174+
words <- "Hello World!"
175+
writeLines(words, path)
176+
spark.addFile(path)
177+
download_path <- spark.getSparkFiles(filename)
178+
expect_equal(readLines(download_path), words)
179+
unlink(path)
180+
sparkR.session.stop()
181+
})

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -801,15 +801,4 @@ test_that("Test correct concurrency of RRDD.compute()", {
801801
expect_equal(count, 1000)
802802
})
803803

804-
test_that("add and get file to be downloaded with Spark job on every node", {
805-
path <- tempfile(pattern = "hello", fileext = ".txt")
806-
filename <- basename(path)
807-
words <- "Hello World!"
808-
writeLines(words, path)
809-
addFile(sc, path)
810-
download_path <- sparkFiles.get(filename)
811-
expect_equal(readLines(download_path), words)
812-
unlink(path)
813-
})
814-
815804
sparkR.session.stop()

0 commit comments

Comments
 (0)