@@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6666 .Object
6767})
6868
69+ setMethod ("show ", "RDD",
70+ function (.Object ) {
71+ cat(paste(callJMethod(.Object @ jrdd , " toString" ), " \n " , sep = " " ))
72+ })
73+
6974setMethod ("initialize ", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
7075 .Object @ env <- new.env()
7176 .Object @ env $ isCached <- FALSE
@@ -91,8 +96,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9196 # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
9297 # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
9398 } else {
94- pipelinedFunc <- function (split , iterator ) {
95- func(split , prev @ func(split , iterator ))
99+ pipelinedFunc <- function (partIndex , part ) {
100+ func(partIndex , prev @ func(partIndex , part ))
96101 }
97102 .Object @ func <- cleanClosure(pipelinedFunc )
98103 .Object @ prev_jrdd <- prev @ prev_jrdd # maintain the pipeline
@@ -306,7 +311,7 @@ setMethod("numPartitions",
306311 signature(x = " RDD" ),
307312 function (x ) {
308313 jrdd <- getJRDD(x )
309- partitions <- callJMethod(jrdd , " splits " )
314+ partitions <- callJMethod(jrdd , " partitions " )
310315 callJMethod(partitions , " size" )
311316 })
312317
@@ -452,8 +457,8 @@ setMethod("countByValue",
452457setMethod ("lapply ",
453458 signature(X = " RDD" , FUN = " function" ),
454459 function (X , FUN ) {
455- func <- function (split , iterator ) {
456- lapply(iterator , FUN )
460+ func <- function (partIndex , part ) {
461+ lapply(part , FUN )
457462 }
458463 lapplyPartitionsWithIndex(X , func )
459464 })
@@ -538,8 +543,8 @@ setMethod("mapPartitions",
538543# '\dontrun{
539544# ' sc <- sparkR.init()
540545# ' rdd <- parallelize(sc, 1:10, 5L)
541- # ' prod <- lapplyPartitionsWithIndex(rdd, function(split , part) {
542- # ' split * Reduce("+", part) })
546+ # ' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex , part) {
547+ # ' partIndex * Reduce("+", part) })
543548# ' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
544549# '}
545550# ' @rdname lapplyPartitionsWithIndex
@@ -813,7 +818,7 @@ setMethod("distinct",
813818# ' @examples
814819# '\dontrun{
815820# ' sc <- sparkR.init()
816- # ' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
821+ # ' rdd <- parallelize(sc, 1:10)
817822# ' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
818823# ' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
819824# '}
@@ -825,14 +830,14 @@ setMethod("sampleRDD",
825830 function (x , withReplacement , fraction , seed ) {
826831
827832 # The sampler: takes a partition and returns its sampled version.
828- samplingFunc <- function (split , part ) {
833+ samplingFunc <- function (partIndex , part ) {
829834 set.seed(seed )
830835 res <- vector(" list" , length(part ))
831836 len <- 0
832837
833838 # Discards some random values to ensure each partition has a
834839 # different random seed.
835- runif(split )
840+ runif(partIndex )
836841
837842 for (elem in part ) {
838843 if (withReplacement ) {
@@ -967,7 +972,7 @@ setMethod("keyBy",
967972setMethod ("repartition ",
968973 signature(x = " RDD" , numPartitions = " numeric" ),
969974 function (x , numPartitions ) {
970- coalesce(x , numToInt( numPartitions ) , TRUE )
975+ coalesce(x , numPartitions , TRUE )
971976 })
972977
973978# ' Return a new RDD that is reduced into numPartitions partitions.
@@ -989,8 +994,8 @@ setMethod("coalesce",
989994 function (x , numPartitions , shuffle = FALSE ) {
990995 numPartitions <- numToInt(numPartitions )
991996 if (shuffle || numPartitions > SparkR :: numPartitions(x )) {
992- func <- function (s , part ) {
993- set.seed(s ) # split as seed
997+ func <- function (partIndex , part ) {
998+ set.seed(partIndex ) # partIndex as seed
994999 start <- as.integer(sample(numPartitions , 1 ) - 1 )
9951000 lapply(seq_along(part ),
9961001 function (i ) {
@@ -1035,7 +1040,7 @@ setMethod("saveAsObjectFile",
10351040# ' Save this RDD as a text file, using string representations of elements.
10361041# '
10371042# ' @param x The RDD to save
1038- # ' @param path The directory where the splits of the text file are saved
1043+ # ' @param path The directory where the partitions of the text file are saved
10391044# ' @examples
10401045# '\dontrun{
10411046# ' sc <- sparkR.init()
@@ -1335,10 +1340,10 @@ setMethod("zipWithUniqueId",
13351340 function (x ) {
13361341 n <- numPartitions(x )
13371342
1338- partitionFunc <- function (split , part ) {
1343+ partitionFunc <- function (partIndex , part ) {
13391344 mapply(
13401345 function (item , index ) {
1341- list (item , (index - 1 ) * n + split )
1346+ list (item , (index - 1 ) * n + partIndex )
13421347 },
13431348 part ,
13441349 seq_along(part ),
@@ -1382,11 +1387,11 @@ setMethod("zipWithIndex",
13821387 startIndices <- Reduce(" +" , nums , accumulate = TRUE )
13831388 }
13841389
1385- partitionFunc <- function (split , part ) {
1386- if (split == 0 ) {
1390+ partitionFunc <- function (partIndex , part ) {
1391+ if (partIndex == 0 ) {
13871392 startIndex <- 0
13881393 } else {
1389- startIndex <- startIndices [[split ]]
1394+ startIndex <- startIndices [[partIndex ]]
13901395 }
13911396
13921397 mapply(
@@ -1590,3 +1595,49 @@ setMethod("intersection",
15901595
15911596 keys(filterRDD(cogroup(rdd1 , rdd2 , numPartitions = numPartitions ), filterFunction ))
15921597 })
1598+
1599+ # ' Zips an RDD's partitions with one (or more) RDD(s).
1600+ # ' Same as zipPartitions in Spark.
1601+ # '
1602+ # ' @param ... RDDs to be zipped.
1603+ # ' @param func A function to transform zipped partitions.
1604+ # ' @return A new RDD by applying a function to the zipped partitions.
1605+ # ' Assumes that all the RDDs have the *same number of partitions*, but
1606+ # ' does *not* require them to have the same number of elements in each partition.
1607+ # ' @examples
1608+ # '\dontrun{
1609+ # ' sc <- sparkR.init()
1610+ # ' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1611+ # ' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1612+ # ' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1613+ # ' collect(zipPartitions(rdd1, rdd2, rdd3,
1614+ # ' func = function(x, y, z) { list(list(x, y, z))} ))
1615+ # ' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1616+ # '}
1617+ # ' @rdname zipRDD
1618+ # ' @aliases zipPartitions,RDD
1619+ setMethod ("zipPartitions ",
1620+ " RDD" ,
1621+ function (... , func ) {
1622+ rrdds <- list (... )
1623+ if (length(rrdds ) == 1 ) {
1624+ return (rrdds [[1 ]])
1625+ }
1626+ nPart <- sapply(rrdds , numPartitions )
1627+ if (length(unique(nPart )) != 1 ) {
1628+ stop(" Can only zipPartitions RDDs which have the same number of partitions." )
1629+ }
1630+
1631+ rrdds <- lapply(rrdds , function (rdd ) {
1632+ mapPartitionsWithIndex(rdd , function (partIndex , part ) {
1633+ print(length(part ))
1634+ list (list (partIndex , part ))
1635+ })
1636+ })
1637+ union.rdd <- Reduce(unionRDD , rrdds )
1638+ zipped.rdd <- values(groupByKey(union.rdd , numPartitions = nPart [1 ]))
1639+ res <- mapPartitions(zipped.rdd , function (plist ) {
1640+ do.call(func , plist [[1 ]])
1641+ })
1642+ res
1643+ })
0 commit comments