Skip to content

Commit c9a876e

Browse files
felixcheungcmonkey
authored andcommitted
[SPARK-18788][SPARKR] Add API for getNumPartitions
## What changes were proposed in this pull request? With doc to say this would convert DF into RDD ## How was this patch tested? unit tests, manual tests Author: Felix Cheung <[email protected]> Closes apache#16668 from felixcheung/rgetnumpartitions.
1 parent c768a90 commit c9a876e

File tree

7 files changed

+59
-31
lines changed

7 files changed

+59
-31
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ exportMethods("arrange",
9595
"freqItems",
9696
"gapply",
9797
"gapplyCollect",
98+
"getNumPartitions",
9899
"group_by",
99100
"groupBy",
100101
"head",

R/pkg/R/DataFrame.R

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3428,3 +3428,26 @@ setMethod("randomSplit",
34283428
}
34293429
sapply(sdfs, dataFrame)
34303430
})
3431+
3432+
#' getNumPartitions
3433+
#'
3434+
#' Return the number of partitions
3435+
#'
3436+
#' @param x A SparkDataFrame
3437+
#' @family SparkDataFrame functions
3438+
#' @aliases getNumPartitions,SparkDataFrame-method
3439+
#' @rdname getNumPartitions
3440+
#' @name getNumPartitions
3441+
#' @export
3442+
#' @examples
3443+
#'\dontrun{
3444+
#' sparkR.session()
3445+
#' df <- createDataFrame(cars, numPartitions = 2)
3446+
#' getNumPartitions(df)
3447+
#' }
3448+
#' @note getNumPartitions since 2.1.1
3449+
setMethod("getNumPartitions",
3450+
signature(x = "SparkDataFrame"),
3451+
function(x) {
3452+
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3453+
})

R/pkg/R/RDD.R

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ setMethod("checkpoint",
313313
#' @rdname getNumPartitions
314314
#' @aliases getNumPartitions,RDD-method
315315
#' @noRd
316-
setMethod("getNumPartitions",
316+
setMethod("getNumPartitionsRDD",
317317
signature(x = "RDD"),
318318
function(x) {
319319
callJMethod(getJRDD(x), "getNumPartitions")
@@ -329,7 +329,7 @@ setMethod("numPartitions",
329329
signature(x = "RDD"),
330330
function(x) {
331331
.Deprecated("getNumPartitions")
332-
getNumPartitions(x)
332+
getNumPartitionsRDD(x)
333333
})
334334

335335
#' Collect elements of an RDD
@@ -460,7 +460,7 @@ setMethod("countByValue",
460460
signature(x = "RDD"),
461461
function(x) {
462462
ones <- lapply(x, function(item) { list(item, 1L) })
463-
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
463+
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
464464
})
465465

466466
#' Apply a function to all elements
@@ -780,7 +780,7 @@ setMethod("takeRDD",
780780
resList <- list()
781781
index <- -1
782782
jrdd <- getJRDD(x)
783-
numPartitions <- getNumPartitions(x)
783+
numPartitions <- getNumPartitionsRDD(x)
784784
serializedModeRDD <- getSerializedMode(x)
785785

786786
# TODO(shivaram): Collect more than one partition based on size
@@ -846,7 +846,7 @@ setMethod("firstRDD",
846846
#' @noRd
847847
setMethod("distinctRDD",
848848
signature(x = "RDD"),
849-
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
849+
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
850850
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
851851
reduced <- reduceByKey(identical.mapped,
852852
function(x, y) { x },
@@ -1053,7 +1053,7 @@ setMethod("coalesce",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)
1056-
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
1056+
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
10571057
func <- function(partIndex, part) {
10581058
set.seed(partIndex) # partIndex as seed
10591059
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
11431143
#' @noRd
11441144
setMethod("sortBy",
11451145
signature(x = "RDD", func = "function"),
1146-
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
1146+
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
11471147
values(sortByKey(keyBy(x, func), ascending, numPartitions))
11481148
})
11491149

@@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
11751175
resList <- list()
11761176
index <- -1
11771177
jrdd <- getJRDD(newRdd)
1178-
numPartitions <- getNumPartitions(newRdd)
1178+
numPartitions <- getNumPartitionsRDD(newRdd)
11791179
serializedModeRDD <- getSerializedMode(newRdd)
11801180

11811181
while (TRUE) {
@@ -1407,7 +1407,7 @@ setMethod("setName",
14071407
setMethod("zipWithUniqueId",
14081408
signature(x = "RDD"),
14091409
function(x) {
1410-
n <- getNumPartitions(x)
1410+
n <- getNumPartitionsRDD(x)
14111411

14121412
partitionFunc <- function(partIndex, part) {
14131413
mapply(
@@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
14501450
setMethod("zipWithIndex",
14511451
signature(x = "RDD"),
14521452
function(x) {
1453-
n <- getNumPartitions(x)
1453+
n <- getNumPartitionsRDD(x)
14541454
if (n > 1) {
14551455
nums <- collectRDD(lapplyPartition(x,
14561456
function(part) {
@@ -1566,8 +1566,8 @@ setMethod("unionRDD",
15661566
setMethod("zipRDD",
15671567
signature(x = "RDD", other = "RDD"),
15681568
function(x, other) {
1569-
n1 <- getNumPartitions(x)
1570-
n2 <- getNumPartitions(other)
1569+
n1 <- getNumPartitionsRDD(x)
1570+
n2 <- getNumPartitionsRDD(other)
15711571
if (n1 != n2) {
15721572
stop("Can only zip RDDs which have the same number of partitions.")
15731573
}
@@ -1637,7 +1637,7 @@ setMethod("cartesian",
16371637
#' @noRd
16381638
setMethod("subtract",
16391639
signature(x = "RDD", other = "RDD"),
1640-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1640+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16411641
mapFunction <- function(e) { list(e, NA) }
16421642
rdd1 <- map(x, mapFunction)
16431643
rdd2 <- map(other, mapFunction)
@@ -1671,7 +1671,7 @@ setMethod("subtract",
16711671
#' @noRd
16721672
setMethod("intersection",
16731673
signature(x = "RDD", other = "RDD"),
1674-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1674+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16751675
rdd1 <- map(x, function(v) { list(v, NA) })
16761676
rdd2 <- map(other, function(v) { list(v, NA) })
16771677

@@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
17141714
if (length(rrdds) == 1) {
17151715
return(rrdds[[1]])
17161716
}
1717-
nPart <- sapply(rrdds, getNumPartitions)
1717+
nPart <- sapply(rrdds, getNumPartitionsRDD)
17181718
if (length(unique(nPart)) != 1) {
17191719
stop("Can only zipPartitions RDDs which have the same number of partitions.")
17201720
}

R/pkg/R/generics.R

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
138138
# @export
139139
setGeneric("name", function(x) { standardGeneric("name") })
140140

141-
# @rdname getNumPartitions
141+
# @rdname getNumPartitionsRDD
142142
# @export
143-
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
143+
setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") })
144144

145145
# @rdname getNumPartitions
146146
# @export
@@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
492492
#' @export
493493
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })
494494

495+
# @rdname getNumPartitions
496+
# @export
497+
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
498+
495499
#' @rdname summary
496500
#' @export
497501
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

R/pkg/R/pairRDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ setMethod("cogroup",
780780
#' @noRd
781781
setMethod("sortByKey",
782782
signature(x = "RDD"),
783-
function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
783+
function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
784784
rangeBounds <- list()
785785

786786
if (numPartitions > 1) {
@@ -850,7 +850,7 @@ setMethod("sortByKey",
850850
#' @noRd
851851
setMethod("subtractByKey",
852852
signature(x = "RDD", other = "RDD"),
853-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
853+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
854854
filterFunction <- function(elem) {
855855
iters <- elem[[2]]
856856
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
2929
intRdd <- parallelize(sc, intPairs, 2L)
3030

3131
test_that("get number of partitions in RDD", {
32-
expect_equal(getNumPartitions(rdd), 2)
33-
expect_equal(getNumPartitions(intRdd), 2)
32+
expect_equal(getNumPartitionsRDD(rdd), 2)
33+
expect_equal(getNumPartitionsRDD(intRdd), 2)
3434
})
3535

3636
test_that("first on RDD", {
@@ -305,18 +305,18 @@ test_that("repartition/coalesce on RDDs", {
305305

306306
# repartition
307307
r1 <- repartitionRDD(rdd, 2)
308-
expect_equal(getNumPartitions(r1), 2L)
308+
expect_equal(getNumPartitionsRDD(r1), 2L)
309309
count <- length(collectPartition(r1, 0L))
310310
expect_true(count >= 8 && count <= 12)
311311

312312
r2 <- repartitionRDD(rdd, 6)
313-
expect_equal(getNumPartitions(r2), 6L)
313+
expect_equal(getNumPartitionsRDD(r2), 6L)
314314
count <- length(collectPartition(r2, 0L))
315315
expect_true(count >= 0 && count <= 4)
316316

317317
# coalesce
318318
r3 <- coalesce(rdd, 1)
319-
expect_equal(getNumPartitions(r3), 1L)
319+
expect_equal(getNumPartitionsRDD(r3), 1L)
320320
count <- length(collectPartition(r3, 0L))
321321
expect_equal(count, 20)
322322
})

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,26 +196,26 @@ test_that("create DataFrame from RDD", {
196196
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
197197
expect_equal(as.list(collect(where(df, df$name == "John"))),
198198
list(name = "John", age = 19L, height = 176.5))
199-
expect_equal(getNumPartitions(toRDD(df)), 1)
199+
expect_equal(getNumPartitions(df), 1)
200200

201201
df <- as.DataFrame(cars, numPartitions = 2)
202-
expect_equal(getNumPartitions(toRDD(df)), 2)
202+
expect_equal(getNumPartitions(df), 2)
203203
df <- createDataFrame(cars, numPartitions = 3)
204-
expect_equal(getNumPartitions(toRDD(df)), 3)
204+
expect_equal(getNumPartitions(df), 3)
205205
# validate limit by num of rows
206206
df <- createDataFrame(cars, numPartitions = 60)
207-
expect_equal(getNumPartitions(toRDD(df)), 50)
207+
expect_equal(getNumPartitions(df), 50)
208208
# validate when 1 < (length(coll) / numSlices) << length(coll)
209209
df <- createDataFrame(cars, numPartitions = 20)
210-
expect_equal(getNumPartitions(toRDD(df)), 20)
210+
expect_equal(getNumPartitions(df), 20)
211211

212212
df <- as.DataFrame(data.frame(0))
213213
expect_is(df, "SparkDataFrame")
214214
df <- createDataFrame(list(list(1)))
215215
expect_is(df, "SparkDataFrame")
216216
df <- as.DataFrame(data.frame(0), numPartitions = 2)
217217
# no data to partition, goes to 1
218-
expect_equal(getNumPartitions(toRDD(df)), 1)
218+
expect_equal(getNumPartitions(df), 1)
219219

220220
setHiveContext(sc)
221221
sql("CREATE TABLE people (name string, age double, height float)")
@@ -234,7 +234,7 @@ test_that("createDataFrame uses files for large objects", {
234234
conf <- callJMethod(sparkSession, "conf")
235235
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
236236
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
237-
expect_equal(getNumPartitions(toRDD(df)), 3)
237+
expect_equal(getNumPartitions(df), 3)
238238

239239
# Resetting the conf back to default value
240240
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))

0 commit comments

Comments
 (0)