@@ -190,7 +190,7 @@ setMethod("flatMapValues",
190190# ' @rdname partitionBy
191191# ' @aliases partitionBy,RDD,integer-method
192192setMethod ("partitionBy ",
193- signature(x = " RDD" , numPartitions = " integer " ),
193+ signature(x = " RDD" , numPartitions = " numeric " ),
194194 function (x , numPartitions , partitionFunc = hashCode ) {
195195
196196 # if (missing(partitionFunc)) {
@@ -206,12 +206,12 @@ setMethod("partitionBy",
206206 get(name , .broadcastNames ) })
207207 jrdd <- getJRDD(x )
208208
209- # We create a PairwiseRRDD that extends RDD[(Array[Byte],
210- # Array[Byte])], where the key is the hashed split , the value is
209+ # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte]) ],
210+ # where the key is the target partition number , the value is
211211 # the content (key-val pairs).
212212 pairwiseRRDD <- newJObject(" org.apache.spark.api.r.PairwiseRRDD" ,
213213 callJMethod(jrdd , " rdd" ),
214- as.integer (numPartitions ),
214+ numToInt (numPartitions ),
215215 serializedHashFuncBytes ,
216216 getSerializedMode(x ),
217217 packageNamesArr ,
@@ -221,7 +221,7 @@ setMethod("partitionBy",
221221
222222 # Create a corresponding partitioner.
223223 rPartitioner <- newJObject(" org.apache.spark.HashPartitioner" ,
224- as.integer (numPartitions ))
224+ numToInt (numPartitions ))
225225
226226 # Call partitionBy on the obtained PairwiseRDD.
227227 javaPairRDD <- callJMethod(pairwiseRRDD , " asJavaPairRDD" )
@@ -256,7 +256,7 @@ setMethod("partitionBy",
256256# ' @rdname groupByKey
257257# ' @aliases groupByKey,RDD,integer-method
258258setMethod ("groupByKey ",
259- signature(x = " RDD" , numPartitions = " integer " ),
259+ signature(x = " RDD" , numPartitions = " numeric " ),
260260 function (x , numPartitions ) {
261261 shuffled <- partitionBy(x , numPartitions )
262262 groupVals <- function (part ) {
@@ -315,7 +315,7 @@ setMethod("groupByKey",
315315# ' @rdname reduceByKey
316316# ' @aliases reduceByKey,RDD,integer-method
317317setMethod ("reduceByKey ",
318- signature(x = " RDD" , combineFunc = " ANY" , numPartitions = " integer " ),
318+ signature(x = " RDD" , combineFunc = " ANY" , numPartitions = " numeric " ),
319319 function (x , combineFunc , numPartitions ) {
320320 reduceVals <- function (part ) {
321321 vals <- new.env()
@@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally",
422422# ' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
423423setMethod ("combineByKey ",
424424 signature(x = " RDD" , createCombiner = " ANY" , mergeValue = " ANY" ,
425- mergeCombiners = " ANY" , numPartitions = " integer " ),
425+ mergeCombiners = " ANY" , numPartitions = " numeric " ),
426426 function (x , createCombiner , mergeValue , mergeCombiners , numPartitions ) {
427427 combineLocally <- function (part ) {
428428 combiners <- new.env()
@@ -483,7 +483,7 @@ setMethod("combineByKey",
483483# ' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
484484setMethod ("aggregateByKey ",
485485 signature(x = " RDD" , zeroValue = " ANY" , seqOp = " ANY" ,
486- combOp = " ANY" , numPartitions = " integer " ),
486+ combOp = " ANY" , numPartitions = " numeric " ),
487487 function (x , zeroValue , seqOp , combOp , numPartitions ) {
488488 createCombiner <- function (v ) {
489489 do.call(seqOp , list (zeroValue , v ))
@@ -514,7 +514,7 @@ setMethod("aggregateByKey",
514514# ' @aliases foldByKey,RDD,ANY,ANY,integer-method
515515setMethod ("foldByKey ",
516516 signature(x = " RDD" , zeroValue = " ANY" ,
517- func = " ANY" , numPartitions = " integer " ),
517+ func = " ANY" , numPartitions = " numeric " ),
518518 function (x , zeroValue , func , numPartitions ) {
519519 aggregateByKey(x , zeroValue , func , func , numPartitions )
520520 })
@@ -553,7 +553,7 @@ setMethod("join",
553553 joinTaggedList(v , list (FALSE , FALSE ))
554554 }
555555
556- joined <- flatMapValues(groupByKey(unionRDD(xTagged , yTagged ), numToInt( numPartitions ) ),
556+ joined <- flatMapValues(groupByKey(unionRDD(xTagged , yTagged ), numPartitions ),
557557 doJoin )
558558 })
559559
@@ -582,7 +582,7 @@ setMethod("join",
582582# ' @rdname join-methods
583583# ' @aliases leftOuterJoin,RDD,RDD-method
584584setMethod ("leftOuterJoin ",
585- signature(x = " RDD" , y = " RDD" , numPartitions = " integer " ),
585+ signature(x = " RDD" , y = " RDD" , numPartitions = " numeric " ),
586586 function (x , y , numPartitions ) {
587587 xTagged <- lapply(x , function (i ) { list (i [[1 ]], list (1L , i [[2 ]])) })
588588 yTagged <- lapply(y , function (i ) { list (i [[1 ]], list (2L , i [[2 ]])) })
@@ -619,7 +619,7 @@ setMethod("leftOuterJoin",
619619# ' @rdname join-methods
620620# ' @aliases rightOuterJoin,RDD,RDD-method
621621setMethod ("rightOuterJoin ",
622- signature(x = " RDD" , y = " RDD" , numPartitions = " integer " ),
622+ signature(x = " RDD" , y = " RDD" , numPartitions = " numeric " ),
623623 function (x , y , numPartitions ) {
624624 xTagged <- lapply(x , function (i ) { list (i [[1 ]], list (1L , i [[2 ]])) })
625625 yTagged <- lapply(y , function (i ) { list (i [[1 ]], list (2L , i [[2 ]])) })
@@ -659,7 +659,7 @@ setMethod("rightOuterJoin",
659659# ' @rdname join-methods
660660# ' @aliases fullOuterJoin,RDD,RDD-method
661661setMethod ("fullOuterJoin ",
662- signature(x = " RDD" , y = " RDD" , numPartitions = " integer " ),
662+ signature(x = " RDD" , y = " RDD" , numPartitions = " numeric " ),
663663 function (x , y , numPartitions ) {
664664 xTagged <- lapply(x , function (i ) { list (i [[1 ]], list (1L , i [[2 ]])) })
665665 yTagged <- lapply(y , function (i ) { list (i [[1 ]], list (2L , i [[2 ]])) })
@@ -866,8 +866,8 @@ setMethod("sampleByKey",
866866 }
867867
868868 # The sampler: takes a partition and returns its sampled version.
869- samplingFunc <- function (split , part ) {
870- set.seed(bitwXor(seed , split ))
869+ samplingFunc <- function (partIndex , part ) {
870+ set.seed(bitwXor(seed , partIndex ))
871871 res <- vector(" list" , length(part ))
872872 len <- 0
873873
0 commit comments