Skip to content

Commit ba01358

Browse files
committed
Merge pull request apache#115 from sun-rui/SPARKR-130
[SPARKR-130] Add persist(storageLevel) API to RDD.
2 parents 9770312 + 7190a2c commit ba01358

File tree

4 files changed

+94
-1
lines changed

4 files changed

+94
-1
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ exportMethods(
3434
"minimum",
3535
"numPartitions",
3636
"partitionBy",
37+
"persist",
3738
"reduce",
3839
"reduceByKey",
3940
"sampleRDD",

pkg/R/RDD.R

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,57 @@ setMethod("cache",
173173
rdd
174174
})
175175

176+
#' Persist an RDD
177+
#'
178+
#' Persist this RDD with the specified storage level. For details of the
179+
#' supported storage levels, refer to
180+
#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
181+
#'
182+
#' @param rdd The RDD to persist
183+
#' @param newLevel The new storage level to be assigned
184+
#' @rdname persist
185+
#' @export
186+
#' @examples
187+
#'\dontrun{
188+
#' sc <- sparkR.init()
189+
#' rdd <- parallelize(sc, 1:10, 2L)
190+
#' persist(rdd, "MEMORY_AND_DISK")
191+
#'}
192+
setGeneric("persist", function(rdd, newLevel) { standardGeneric("persist") })
193+
194+
#' @rdname persist
195+
#' @aliases persist,RDD-method
196+
setMethod("persist",
197+
signature(rdd = "RDD", newLevel = "character"),
198+
function(rdd, newLevel = c("DISK_ONLY",
199+
"DISK_ONLY_2",
200+
"MEMORY_AND_DISK",
201+
"MEMORY_AND_DISK_2",
202+
"MEMORY_AND_DISK_SER",
203+
"MEMORY_AND_DISK_SER_2",
204+
"MEMORY_ONLY",
205+
"MEMORY_ONLY_2",
206+
"MEMORY_ONLY_SER",
207+
"MEMORY_ONLY_SER_2",
208+
"OFF_HEAP")) {
209+
match.arg(newLevel)
210+
storageLevel <- switch(newLevel,
211+
"DISK_ONLY" = J("org.apache.spark.storage.StorageLevel")$DISK_ONLY(),
212+
"DISK_ONLY_2" = J("org.apache.spark.storage.StorageLevel")$DISK_ONLY_2(),
213+
"MEMORY_AND_DISK" = J("org.apache.spark.storage.StorageLevel")$MEMORY_AND_DISK(),
214+
"MEMORY_AND_DISK_2" = J("org.apache.spark.storage.StorageLevel")$MEMORY_AND_DISK_2(),
215+
"MEMORY_AND_DISK_SER" = J("org.apache.spark.storage.StorageLevel")$MEMORY_AND_DISK_SER(),
216+
"MEMORY_AND_DISK_SER_2" = J("org.apache.spark.storage.StorageLevel")$MEMORY_AND_DISK_SER_2(),
217+
"MEMORY_ONLY" = J("org.apache.spark.storage.StorageLevel")$MEMORY_ONLY(),
218+
"MEMORY_ONLY_2" = J("org.apache.spark.storage.StorageLevel")$MEMORY_ONLY_2(),
219+
"MEMORY_ONLY_SER" = J("org.apache.spark.storage.StorageLevel")$MEMORY_ONLY_SER(),
220+
"MEMORY_ONLY_SER_2" = J("org.apache.spark.storage.StorageLevel")$MEMORY_ONLY_SER_2(),
221+
"OFF_HEAP" = J("org.apache.spark.storage.StorageLevel")$OFF_HEAP())
222+
223+
.jcall(getJRDD(rdd), "Lorg/apache/spark/api/java/JavaRDD;", "persist", storageLevel)
224+
rdd@env$isCached <- TRUE
225+
rdd
226+
})
176227

177228
#' Unpersist an RDD
178229
#'

pkg/inst/tests/test_rdd.R

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
8989
collect(rdd2)
9090
})
9191

92-
test_that("PipelinedRDD support actions: cache(), unpersist(), checkpoint()", {
92+
test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
9393
# RDD
9494
rdd2 <- rdd
9595
# PipelinedRDD
@@ -107,6 +107,14 @@ test_that("PipelinedRDD support actions: cache(), unpersist(), checkpoint()", {
107107
unpersist(rdd2)
108108
expect_false(rdd2@env$isCached)
109109

110+
persist(rdd2, "MEMORY_AND_DISK")
111+
expect_true(rdd2@env$isCached)
112+
rdd2 <- lapply(rdd2, function(x) x)
113+
expect_false(rdd2@env$isCached)
114+
115+
unpersist(rdd2)
116+
expect_false(rdd2@env$isCached)
117+
110118
setCheckpointDir(sc, "checkpoints")
111119
checkpoint(rdd2)
112120
expect_true(rdd2@env$isCheckpointed)

pkg/man/persist.Rd

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{persist}
4+
\alias{persist}
5+
\alias{persist,RDD,character-method}
6+
\alias{persist,RDD-method}
7+
\title{Persist an RDD}
8+
\usage{
9+
persist(rdd, newLevel)
10+
11+
\S4method{persist}{RDD,character}(rdd, newLevel = c("DISK_ONLY",
12+
"DISK_ONLY_2", "MEMORY_AND_DISK", "MEMORY_AND_DISK_2", "MEMORY_AND_DISK_SER",
13+
"MEMORY_AND_DISK_SER_2", "MEMORY_ONLY", "MEMORY_ONLY_2", "MEMORY_ONLY_SER",
14+
"MEMORY_ONLY_SER_2", "OFF_HEAP"))
15+
}
16+
\arguments{
17+
\item{rdd}{The RDD to persist}
18+
19+
\item{newLevel}{The new storage level to be assigned}
20+
}
21+
\description{
22+
Persist this RDD with the specified storage level. For details of the
23+
supported storage levels, refer to
24+
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
25+
}
26+
\examples{
27+
\dontrun{
28+
sc <- sparkR.init()
29+
rdd <- parallelize(sc, 1:10, 2L)
30+
persist(rdd, "MEMORY_AND_DISK")
31+
}
32+
}
33+

0 commit comments

Comments
 (0)