Skip to content

Commit f83d81d

Browse files
committed
merge with master
2 parents 7aa43b1 + 0fe8020 commit f83d81d

File tree

874 files changed

+20479
-8884
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

874 files changed

+20479
-8884
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependency-reduced-pom.xml
4242
derby.log
4343
dev/create-release/*final
4444
dev/create-release/*txt
45+
dev/pr-deps/
4546
dist/
4647
docs/_site
4748
docs/api

R/WINDOWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ To run the SparkR unit tests on Windows, the following steps are required —ass
3838

3939
```
4040
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
41-
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
41+
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
4242
```
4343

R/pkg/NAMESPACE

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ exportMethods("glm",
6565
"spark.logit",
6666
"spark.randomForest",
6767
"spark.gbt",
68-
"spark.bisectingKmeans")
68+
"spark.bisectingKmeans",
69+
"spark.svmLinear")
6970

7071
# Job group lifecycle management methods
7172
export("setJobGroup",
@@ -81,6 +82,7 @@ exportMethods("arrange",
8182
"as.data.frame",
8283
"attach",
8384
"cache",
85+
"coalesce",
8486
"collect",
8587
"colnames",
8688
"colnames<-",

R/pkg/R/DataFrame.R

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -678,14 +678,53 @@ setMethod("storageLevel",
678678
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
679679
})
680680

681+
#' Coalesce
682+
#'
683+
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
684+
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
685+
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
686+
#' the current partitions. If a larger number of partitions is requested, it will stay at the
687+
#' current number of partitions.
688+
#'
689+
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
690+
#' this may result in your computation taking place on fewer nodes than
691+
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
692+
#' call \code{repartition}. This will add a shuffle step, but means the
693+
#' current upstream partitions will be executed in parallel (per whatever
694+
#' the current partitioning is).
695+
#'
696+
#' @param numPartitions the number of partitions to use.
697+
#'
698+
#' @family SparkDataFrame functions
699+
#' @rdname coalesce
700+
#' @name coalesce
701+
#' @aliases coalesce,SparkDataFrame-method
702+
#' @seealso \link{repartition}
703+
#' @export
704+
#' @examples
705+
#'\dontrun{
706+
#' sparkR.session()
707+
#' path <- "path/to/file.json"
708+
#' df <- read.json(path)
709+
#' newDF <- coalesce(df, 1L)
710+
#'}
711+
#' @note coalesce(SparkDataFrame) since 2.1.1
712+
setMethod("coalesce",
713+
signature(x = "SparkDataFrame"),
714+
function(x, numPartitions) {
715+
stopifnot(is.numeric(numPartitions))
716+
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
717+
dataFrame(sdf)
718+
})
719+
681720
#' Repartition
682721
#'
683722
#' The following options for repartition are possible:
684723
#' \itemize{
685-
#' \item{1.} {Return a new SparkDataFrame partitioned by
724+
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
725+
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
686726
#' the given columns into \code{numPartitions}.}
687-
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
688-
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
727+
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
689728
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
690729
#'}
691730
#' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
697736
#' @rdname repartition
698737
#' @name repartition
699738
#' @aliases repartition,SparkDataFrame-method
739+
#' @seealso \link{coalesce}
700740
#' @export
701741
#' @examples
702742
#'\dontrun{
@@ -1764,6 +1804,10 @@ setClassUnion("numericOrcharacter", c("numeric", "character"))
17641804
#' @note [[ since 1.4.0
17651805
setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17661806
function(x, i) {
1807+
if (length(i) > 1) {
1808+
warning("Subset index has length > 1. Only the first index is used.")
1809+
i <- i[1]
1810+
}
17671811
if (is.numeric(i)) {
17681812
cols <- columns(x)
17691813
i <- cols[[i]]
@@ -1777,6 +1821,10 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17771821
#' @note [[<- since 2.1.1
17781822
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17791823
function(x, i, value) {
1824+
if (length(i) > 1) {
1825+
warning("Subset index has length > 1. Only the first index is used.")
1826+
i <- i[1]
1827+
}
17801828
if (is.numeric(i)) {
17811829
cols <- columns(x)
17821830
i <- cols[[i]]

R/pkg/R/RDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
10281028
signature(x = "RDD"),
10291029
function(x, numPartitions) {
10301030
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031-
coalesce(x, numPartitions, TRUE)
1031+
coalesceRDD(x, numPartitions, TRUE)
10321032
} else {
10331033
stop("Please, specify the number of partitions")
10341034
}
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
10491049
#' @rdname coalesce
10501050
#' @aliases coalesce,RDD
10511051
#' @noRd
1052-
setMethod("coalesce",
1052+
setMethod("coalesceRDD",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)

R/pkg/R/functions.R

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,28 @@ setMethod("ceil",
286286
column(jc)
287287
})
288288

289+
#' Returns the first column that is not NA
290+
#'
291+
#' Returns the first column that is not NA, or NA if all inputs are.
292+
#'
293+
#' @rdname coalesce
294+
#' @name coalesce
295+
#' @family normal_funcs
296+
#' @export
297+
#' @aliases coalesce,Column-method
298+
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
299+
#' @note coalesce(Column) since 2.1.1
300+
setMethod("coalesce",
301+
signature(x = "Column"),
302+
function(x, ...) {
303+
jcols <- lapply(list(x, ...), function (x) {
304+
stopifnot(class(x) == "Column")
305+
x@jc
306+
})
307+
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
308+
column(jc)
309+
})
310+
289311
#' Though scala functions has "col" function, we don't expose it in SparkR
290312
#' because we don't want to conflict with the "col" function in the R base
291313
#' package and we also have "column" function exported which is an alias of "col".
@@ -297,15 +319,15 @@ col <- function(x) {
297319
#' Returns a Column based on the given column name
298320
#'
299321
#' Returns a Column based on the given column name.
300-
#
322+
#'
301323
#' @param x Character column name.
302324
#'
303325
#' @rdname column
304326
#' @name column
305327
#' @family normal_funcs
306328
#' @export
307329
#' @aliases column,character-method
308-
#' @examples \dontrun{column(df)}
330+
#' @examples \dontrun{column("name")}
309331
#' @note column since 1.6.0
310332
setMethod("column",
311333
signature(x = "character"),

R/pkg/R/generics.R

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
2828
# @rdname coalesce
2929
# @seealso repartition
3030
# @export
31-
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
31+
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
3232

3333
# @rdname checkpoint-methods
3434
# @export
@@ -66,7 +66,7 @@ setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("fre
6666
# @rdname approxQuantile
6767
# @export
6868
setGeneric("approxQuantile",
69-
function(x, col, probabilities, relativeError) {
69+
function(x, cols, probabilities, relativeError) {
7070
standardGeneric("approxQuantile")
7171
})
7272

@@ -406,6 +406,13 @@ setGeneric("attach")
406406
#' @export
407407
setGeneric("cache", function(x) { standardGeneric("cache") })
408408

409+
#' @rdname coalesce
410+
#' @param x a Column or a SparkDataFrame.
411+
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
412+
#' provided.
413+
#' @export
414+
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
415+
409416
#' @rdname collect
410417
#' @export
411418
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
@@ -1399,7 +1406,11 @@ setGeneric("spark.randomForest",
13991406

14001407
#' @rdname spark.survreg
14011408
#' @export
1402-
setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") })
1409+
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })
1410+
1411+
#' @rdname spark.svmLinear
1412+
#' @export
1413+
setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") })
14031414

14041415
#' @rdname spark.lda
14051416
#' @export

R/pkg/R/install.R

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
#' Download and Install Apache Spark to a Local Directory
2222
#'
2323
#' \code{install.spark} downloads and installs Spark to a local directory if
24-
#' it is not found. The Spark version we use is the same as the SparkR version.
25-
#' Users can specify a desired Hadoop version, the remote mirror site, and
26-
#' the directory where the package is installed locally.
24+
#' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is
25+
#' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired
26+
#' Hadoop version, the remote mirror site, and the directory where the package is installed locally.
2727
#'
2828
#' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
2929
#' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
@@ -68,6 +68,16 @@
6868
#' \href{http://spark.apache.org/downloads.html}{Apache Spark}
6969
install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
7070
localDir = NULL, overwrite = FALSE) {
71+
sparkHome <- Sys.getenv("SPARK_HOME")
72+
if (isSparkRShell()) {
73+
stopifnot(nchar(sparkHome) > 0)
74+
message("Spark is already running in sparkR shell.")
75+
return(invisible(sparkHome))
76+
} else if (!is.na(file.info(sparkHome)$isdir)) {
77+
message("Spark package found in SPARK_HOME: ", sparkHome)
78+
return(invisible(sparkHome))
79+
}
80+
7181
version <- paste0("spark-", packageVersion("SparkR"))
7282
hadoopVersion <- tolower(hadoopVersion)
7383
hadoopVersionName <- hadoopVersionName(hadoopVersion)

0 commit comments

Comments
 (0)