Skip to content

Commit b0392ed

Browse files
Merge branch 'master' into SPARK-18120
2 parents 751ded0 + c0eda7e commit b0392ed

File tree

290 files changed

+9794
-2774
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

290 files changed

+9794
-2774
lines changed

R/pkg/NAMESPACE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
118
# Imports from base R
219
# Do not include stats:: "rpois", "runif" - causes error at runtime
320
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
@@ -47,7 +64,8 @@ exportMethods("glm",
4764
"spark.kstest",
4865
"spark.logit",
4966
"spark.randomForest",
50-
"spark.gbt")
67+
"spark.gbt",
68+
"spark.bisectingKmeans")
5169

5270
# Job group lifecycle management methods
5371
export("setJobGroup",
@@ -94,6 +112,7 @@ exportMethods("arrange",
94112
"freqItems",
95113
"gapply",
96114
"gapplyCollect",
115+
"getNumPartitions",
97116
"group_by",
98117
"groupBy",
99118
"head",

R/pkg/R/DataFrame.R

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,23 @@ getColumn <- function(x, c) {
17171717
column(callJMethod(x@sdf, "col", c))
17181718
}
17191719

1720+
setColumn <- function(x, c, value) {
1721+
if (class(value) != "Column" && !is.null(value)) {
1722+
if (isAtomicLengthOne(value)) {
1723+
value <- lit(value)
1724+
} else {
1725+
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1726+
}
1727+
}
1728+
1729+
if (is.null(value)) {
1730+
nx <- drop(x, c)
1731+
} else {
1732+
nx <- withColumn(x, c, value)
1733+
}
1734+
nx
1735+
}
1736+
17201737
#' @param name name of a Column (without being wrapped by \code{""}).
17211738
#' @rdname select
17221739
#' @name $
@@ -1735,19 +1752,7 @@ setMethod("$", signature(x = "SparkDataFrame"),
17351752
#' @note $<- since 1.4.0
17361753
setMethod("$<-", signature(x = "SparkDataFrame"),
17371754
function(x, name, value) {
1738-
if (class(value) != "Column" && !is.null(value)) {
1739-
if (isAtomicLengthOne(value)) {
1740-
value <- lit(value)
1741-
} else {
1742-
stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1743-
}
1744-
}
1745-
1746-
if (is.null(value)) {
1747-
nx <- drop(x, name)
1748-
} else {
1749-
nx <- withColumn(x, name, value)
1750-
}
1755+
nx <- setColumn(x, name, value)
17511756
x@sdf <- nx@sdf
17521757
x
17531758
})
@@ -1767,6 +1772,21 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17671772
getColumn(x, i)
17681773
})
17691774

1775+
#' @rdname subset
1776+
#' @name [[<-
1777+
#' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
1778+
#' @note [[<- since 2.1.1
1779+
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1780+
function(x, i, value) {
1781+
if (is.numeric(i)) {
1782+
cols <- columns(x)
1783+
i <- cols[[i]]
1784+
}
1785+
nx <- setColumn(x, i, value)
1786+
x@sdf <- nx@sdf
1787+
x
1788+
})
1789+
17701790
#' @rdname subset
17711791
#' @name [
17721792
#' @aliases [,SparkDataFrame-method
@@ -1814,6 +1834,8 @@ setMethod("[", signature(x = "SparkDataFrame"),
18141834
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
18151835
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
18161836
#' Otherwise, a SparkDataFrame will always be returned.
1837+
#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
1838+
#' If \code{NULL}, the specified Column is dropped.
18171839
#' @param ... currently not used.
18181840
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns.
18191841
#' @export
@@ -3406,3 +3428,26 @@ setMethod("randomSplit",
34063428
}
34073429
sapply(sdfs, dataFrame)
34083430
})
3431+
3432+
#' getNumPartitions
3433+
#'
3434+
#' Return the number of partitions
3435+
#'
3436+
#' @param x A SparkDataFrame
3437+
#' @family SparkDataFrame functions
3438+
#' @aliases getNumPartitions,SparkDataFrame-method
3439+
#' @rdname getNumPartitions
3440+
#' @name getNumPartitions
3441+
#' @export
3442+
#' @examples
3443+
#'\dontrun{
3444+
#' sparkR.session()
3445+
#' df <- createDataFrame(cars, numPartitions = 2)
3446+
#' getNumPartitions(df)
3447+
#' }
3448+
#' @note getNumPartitions since 2.1.1
3449+
setMethod("getNumPartitions",
3450+
signature(x = "SparkDataFrame"),
3451+
function(x) {
3452+
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3453+
})

R/pkg/R/RDD.R

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ setMethod("checkpoint",
313313
#' @rdname getNumPartitions
314314
#' @aliases getNumPartitions,RDD-method
315315
#' @noRd
316-
setMethod("getNumPartitions",
316+
setMethod("getNumPartitionsRDD",
317317
signature(x = "RDD"),
318318
function(x) {
319319
callJMethod(getJRDD(x), "getNumPartitions")
@@ -329,7 +329,7 @@ setMethod("numPartitions",
329329
signature(x = "RDD"),
330330
function(x) {
331331
.Deprecated("getNumPartitions")
332-
getNumPartitions(x)
332+
getNumPartitionsRDD(x)
333333
})
334334

335335
#' Collect elements of an RDD
@@ -460,7 +460,7 @@ setMethod("countByValue",
460460
signature(x = "RDD"),
461461
function(x) {
462462
ones <- lapply(x, function(item) { list(item, 1L) })
463-
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
463+
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
464464
})
465465

466466
#' Apply a function to all elements
@@ -780,7 +780,7 @@ setMethod("takeRDD",
780780
resList <- list()
781781
index <- -1
782782
jrdd <- getJRDD(x)
783-
numPartitions <- getNumPartitions(x)
783+
numPartitions <- getNumPartitionsRDD(x)
784784
serializedModeRDD <- getSerializedMode(x)
785785

786786
# TODO(shivaram): Collect more than one partition based on size
@@ -846,7 +846,7 @@ setMethod("firstRDD",
846846
#' @noRd
847847
setMethod("distinctRDD",
848848
signature(x = "RDD"),
849-
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
849+
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
850850
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
851851
reduced <- reduceByKey(identical.mapped,
852852
function(x, y) { x },
@@ -1053,7 +1053,7 @@ setMethod("coalesce",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)
1056-
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
1056+
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
10571057
func <- function(partIndex, part) {
10581058
set.seed(partIndex) # partIndex as seed
10591059
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
11431143
#' @noRd
11441144
setMethod("sortBy",
11451145
signature(x = "RDD", func = "function"),
1146-
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
1146+
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
11471147
values(sortByKey(keyBy(x, func), ascending, numPartitions))
11481148
})
11491149

@@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
11751175
resList <- list()
11761176
index <- -1
11771177
jrdd <- getJRDD(newRdd)
1178-
numPartitions <- getNumPartitions(newRdd)
1178+
numPartitions <- getNumPartitionsRDD(newRdd)
11791179
serializedModeRDD <- getSerializedMode(newRdd)
11801180

11811181
while (TRUE) {
@@ -1407,7 +1407,7 @@ setMethod("setName",
14071407
setMethod("zipWithUniqueId",
14081408
signature(x = "RDD"),
14091409
function(x) {
1410-
n <- getNumPartitions(x)
1410+
n <- getNumPartitionsRDD(x)
14111411

14121412
partitionFunc <- function(partIndex, part) {
14131413
mapply(
@@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
14501450
setMethod("zipWithIndex",
14511451
signature(x = "RDD"),
14521452
function(x) {
1453-
n <- getNumPartitions(x)
1453+
n <- getNumPartitionsRDD(x)
14541454
if (n > 1) {
14551455
nums <- collectRDD(lapplyPartition(x,
14561456
function(part) {
@@ -1566,8 +1566,8 @@ setMethod("unionRDD",
15661566
setMethod("zipRDD",
15671567
signature(x = "RDD", other = "RDD"),
15681568
function(x, other) {
1569-
n1 <- getNumPartitions(x)
1570-
n2 <- getNumPartitions(other)
1569+
n1 <- getNumPartitionsRDD(x)
1570+
n2 <- getNumPartitionsRDD(other)
15711571
if (n1 != n2) {
15721572
stop("Can only zip RDDs which have the same number of partitions.")
15731573
}
@@ -1637,7 +1637,7 @@ setMethod("cartesian",
16371637
#' @noRd
16381638
setMethod("subtract",
16391639
signature(x = "RDD", other = "RDD"),
1640-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1640+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16411641
mapFunction <- function(e) { list(e, NA) }
16421642
rdd1 <- map(x, mapFunction)
16431643
rdd2 <- map(other, mapFunction)
@@ -1671,7 +1671,7 @@ setMethod("subtract",
16711671
#' @noRd
16721672
setMethod("intersection",
16731673
signature(x = "RDD", other = "RDD"),
1674-
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
1674+
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
16751675
rdd1 <- map(x, function(v) { list(v, NA) })
16761676
rdd2 <- map(other, function(v) { list(v, NA) })
16771677

@@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
17141714
if (length(rrdds) == 1) {
17151715
return(rrdds[[1]])
17161716
}
1717-
nPart <- sapply(rrdds, getNumPartitions)
1717+
nPart <- sapply(rrdds, getNumPartitionsRDD)
17181718
if (length(unique(nPart)) != 1) {
17191719
stop("Can only zipPartitions RDDs which have the same number of partitions.")
17201720
}

R/pkg/R/generics.R

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
138138
# @export
139139
setGeneric("name", function(x) { standardGeneric("name") })
140140

141-
# @rdname getNumPartitions
141+
# @rdname getNumPartitionsRDD
142142
# @export
143-
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
143+
setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") })
144144

145145
# @rdname getNumPartitions
146146
# @export
@@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
492492
#' @export
493493
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })
494494

495+
# @rdname getNumPartitions
496+
# @export
497+
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
498+
495499
#' @rdname summary
496500
#' @export
497501
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
@@ -1338,6 +1342,11 @@ setGeneric("rbind", signature = "...")
13381342
#' @export
13391343
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })
13401344

1345+
#' @rdname spark.bisectingKmeans
1346+
#' @export
1347+
setGeneric("spark.bisectingKmeans",
1348+
function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })
1349+
13411350
#' @rdname spark.gaussianMixture
13421351
#' @export
13431352
setGeneric("spark.gaussianMixture",

0 commit comments

Comments
 (0)