Skip to content

Commit c3f7444

Browse files
committed
Merge remote-tracking branch 'remotes/apache/master' into SPARK-19490
2 parents f790821 + 1a45d2b commit c3f7444

File tree

726 files changed

+21094
-11569
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

726 files changed

+21094
-11569
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependency-reduced-pom.xml
4242
derby.log
4343
dev/create-release/*final
4444
dev/create-release/*txt
45+
dev/pr-deps/
4546
dist/
4647
docs/_site
4748
docs/api

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ notifications:
4444
# 5. Run maven install before running lint-java.
4545
install:
4646
- export MAVEN_SKIP_RC=1
47-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
47+
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
4848

4949
# 6. Run lint-java.
5050
script:

R/pkg/NAMESPACE

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ exportMethods("glm",
6565
"spark.logit",
6666
"spark.randomForest",
6767
"spark.gbt",
68-
"spark.bisectingKmeans")
68+
"spark.bisectingKmeans",
69+
"spark.svmLinear")
6970

7071
# Job group lifecycle management methods
7172
export("setJobGroup",
@@ -81,6 +82,7 @@ exportMethods("arrange",
8182
"as.data.frame",
8283
"attach",
8384
"cache",
85+
"coalesce",
8486
"collect",
8587
"colnames",
8688
"colnames<-",
@@ -325,6 +327,7 @@ exportMethods("%in%",
325327
"toDegrees",
326328
"toRadians",
327329
"to_date",
330+
"to_timestamp",
328331
"to_utc_timestamp",
329332
"translate",
330333
"trim",

R/pkg/R/DataFrame.R

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ setMethod("coltypes",
415415
type <- PRIMITIVE_TYPES[[specialtype]]
416416
}
417417
}
418-
type
418+
type[[1]]
419419
})
420420

421421
# Find which types don't have mapping to R
@@ -678,14 +678,53 @@ setMethod("storageLevel",
678678
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
679679
})
680680

681+
#' Coalesce
682+
#'
683+
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
684+
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
685+
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
686+
#' the current partitions. If a larger number of partitions is requested, it will stay at the
687+
#' current number of partitions.
688+
#'
689+
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
690+
#' this may result in your computation taking place on fewer nodes than
691+
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
692+
#' call \code{repartition}. This will add a shuffle step, but means the
693+
#' current upstream partitions will be executed in parallel (per whatever
694+
#' the current partitioning is).
695+
#'
696+
#' @param numPartitions the number of partitions to use.
697+
#'
698+
#' @family SparkDataFrame functions
699+
#' @rdname coalesce
700+
#' @name coalesce
701+
#' @aliases coalesce,SparkDataFrame-method
702+
#' @seealso \link{repartition}
703+
#' @export
704+
#' @examples
705+
#'\dontrun{
706+
#' sparkR.session()
707+
#' path <- "path/to/file.json"
708+
#' df <- read.json(path)
709+
#' newDF <- coalesce(df, 1L)
710+
#'}
711+
#' @note coalesce(SparkDataFrame) since 2.1.1
712+
setMethod("coalesce",
713+
signature(x = "SparkDataFrame"),
714+
function(x, numPartitions) {
715+
stopifnot(is.numeric(numPartitions))
716+
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
717+
dataFrame(sdf)
718+
})
719+
681720
#' Repartition
682721
#'
683722
#' The following options for repartition are possible:
684723
#' \itemize{
685-
#' \item{1.} {Return a new SparkDataFrame partitioned by
724+
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
725+
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
686726
#' the given columns into \code{numPartitions}.}
687-
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
688-
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
727+
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
689728
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
690729
#'}
691730
#' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
697736
#' @rdname repartition
698737
#' @name repartition
699738
#' @aliases repartition,SparkDataFrame-method
739+
#' @seealso \link{coalesce}
700740
#' @export
701741
#' @examples
702742
#'\dontrun{
@@ -1136,6 +1176,7 @@ setMethod("collect",
11361176
if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
11371177
vec <- do.call(c, col)
11381178
stopifnot(class(vec) != "list")
1179+
class(vec) <- PRIMITIVE_TYPES[[colType]]
11391180
df[[colIndex]] <- vec
11401181
} else {
11411182
df[[colIndex]] <- col

R/pkg/R/RDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
10281028
signature(x = "RDD"),
10291029
function(x, numPartitions) {
10301030
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031-
coalesce(x, numPartitions, TRUE)
1031+
coalesceRDD(x, numPartitions, TRUE)
10321032
} else {
10331033
stop("Please, specify the number of partitions")
10341034
}
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
10491049
#' @rdname coalesce
10501050
#' @aliases coalesce,RDD
10511051
#' @noRd
1052-
setMethod("coalesce",
1052+
setMethod("coalesceRDD",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)

R/pkg/R/functions.R

Lines changed: 97 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,28 @@ setMethod("ceil",
286286
column(jc)
287287
})
288288

289+
#' Returns the first column that is not NA
290+
#'
291+
#' Returns the first column that is not NA, or NA if all inputs are.
292+
#'
293+
#' @rdname coalesce
294+
#' @name coalesce
295+
#' @family normal_funcs
296+
#' @export
297+
#' @aliases coalesce,Column-method
298+
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
299+
#' @note coalesce(Column) since 2.1.1
300+
setMethod("coalesce",
301+
signature(x = "Column"),
302+
function(x, ...) {
303+
jcols <- lapply(list(x, ...), function (x) {
304+
stopifnot(class(x) == "Column")
305+
x@jc
306+
})
307+
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
308+
column(jc)
309+
})
310+
289311
#' Though scala functions has "col" function, we don't expose it in SparkR
290312
#' because we don't want to conflict with the "col" function in the R base
291313
#' package and we also have "column" function exported which is an alias of "col".
@@ -297,15 +319,15 @@ col <- function(x) {
297319
#' Returns a Column based on the given column name
298320
#'
299321
#' Returns a Column based on the given column name.
300-
#
322+
#'
301323
#' @param x Character column name.
302324
#'
303325
#' @rdname column
304326
#' @name column
305327
#' @family normal_funcs
306328
#' @export
307329
#' @aliases column,character-method
308-
#' @examples \dontrun{column(df)}
330+
#' @examples \dontrun{column("name")}
309331
#' @note column since 1.6.0
310332
setMethod("column",
311333
signature(x = "character"),
@@ -1730,24 +1752,90 @@ setMethod("toRadians",
17301752

17311753
#' to_date
17321754
#'
1733-
#' Converts the column into DateType.
1755+
#' Converts the column into a DateType. You may optionally specify a format
1756+
#' according to the rules in:
1757+
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
1758+
#' If the string cannot be parsed according to the specified format (or default),
1759+
#' the value of the column will be null.
1760+
#' The default format is 'yyyy-MM-dd'.
17341761
#'
1735-
#' @param x Column to compute on.
1762+
#' @param x Column to parse.
1763+
#' @param format string to use to parse x Column to DateType. (optional)
17361764
#'
17371765
#' @rdname to_date
17381766
#' @name to_date
17391767
#' @family datetime_funcs
1740-
#' @aliases to_date,Column-method
1768+
#' @aliases to_date,Column,missing-method
17411769
#' @export
1742-
#' @examples \dontrun{to_date(df$c)}
1743-
#' @note to_date since 1.5.0
1770+
#' @examples
1771+
#' \dontrun{
1772+
#' to_date(df$c)
1773+
#' to_date(df$c, 'yyyy-MM-dd')
1774+
#' }
1775+
#' @note to_date(Column) since 1.5.0
17441776
setMethod("to_date",
1745-
signature(x = "Column"),
1746-
function(x) {
1777+
signature(x = "Column", format = "missing"),
1778+
function(x, format) {
17471779
jc <- callJStatic("org.apache.spark.sql.functions", "to_date", x@jc)
17481780
column(jc)
17491781
})
17501782

1783+
#' @rdname to_date
1784+
#' @name to_date
1785+
#' @family datetime_funcs
1786+
#' @aliases to_date,Column,character-method
1787+
#' @export
1788+
#' @note to_date(Column, character) since 2.2.0
1789+
setMethod("to_date",
1790+
signature(x = "Column", format = "character"),
1791+
function(x, format) {
1792+
jc <- callJStatic("org.apache.spark.sql.functions", "to_date", x@jc, format)
1793+
column(jc)
1794+
})
1795+
1796+
#' to_timestamp
1797+
#'
1798+
#' Converts the column into a TimestampType. You may optionally specify a format
1799+
#' according to the rules in:
1800+
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
1801+
#' If the string cannot be parsed according to the specified format (or default),
1802+
#' the value of the column will be null.
1803+
#' The default format is 'yyyy-MM-dd HH:mm:ss'.
1804+
#'
1805+
#' @param x Column to parse.
1806+
#' @param format string to use to parse x Column to DateType. (optional)
1807+
#'
1808+
#' @rdname to_timestamp
1809+
#' @name to_timestamp
1810+
#' @family datetime_funcs
1811+
#' @aliases to_timestamp,Column,missing-method
1812+
#' @export
1813+
#' @examples
1814+
#' \dontrun{
1815+
#' to_timestamp(df$c)
1816+
#' to_timestamp(df$c, 'yyyy-MM-dd')
1817+
#' }
1818+
#' @note to_timestamp(Column) since 2.2.0
1819+
setMethod("to_timestamp",
1820+
signature(x = "Column", format = "missing"),
1821+
function(x, format) {
1822+
jc <- callJStatic("org.apache.spark.sql.functions", "to_timestamp", x@jc)
1823+
column(jc)
1824+
})
1825+
1826+
#' @rdname to_timestamp
1827+
#' @name to_timestamp
1828+
#' @family datetime_funcs
1829+
#' @aliases to_timestamp,Column,character-method
1830+
#' @export
1831+
#' @note to_timestamp(Column, character) since 2.2.0
1832+
setMethod("to_timestamp",
1833+
signature(x = "Column", format = "character"),
1834+
function(x, format) {
1835+
jc <- callJStatic("org.apache.spark.sql.functions", "to_timestamp", x@jc, format)
1836+
column(jc)
1837+
})
1838+
17511839
#' trim
17521840
#'
17531841
#' Trim the spaces from both ends for the specified string column.

R/pkg/R/generics.R

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
2828
# @rdname coalesce
2929
# @seealso repartition
3030
# @export
31-
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
31+
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
3232

3333
# @rdname checkpoint-methods
3434
# @export
@@ -66,7 +66,7 @@ setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("fre
6666
# @rdname approxQuantile
6767
# @export
6868
setGeneric("approxQuantile",
69-
function(x, col, probabilities, relativeError) {
69+
function(x, cols, probabilities, relativeError) {
7070
standardGeneric("approxQuantile")
7171
})
7272

@@ -406,6 +406,13 @@ setGeneric("attach")
406406
#' @export
407407
setGeneric("cache", function(x) { standardGeneric("cache") })
408408

409+
#' @rdname coalesce
410+
#' @param x a Column or a SparkDataFrame.
411+
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
412+
#' provided.
413+
#' @export
414+
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
415+
409416
#' @rdname collect
410417
#' @export
411418
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
@@ -1256,7 +1263,11 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
12561263

12571264
#' @rdname to_date
12581265
#' @export
1259-
setGeneric("to_date", function(x) { standardGeneric("to_date") })
1266+
setGeneric("to_date", function(x, format) { standardGeneric("to_date") })
1267+
1268+
#' @rdname to_timestamp
1269+
#' @export
1270+
setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") })
12601271

12611272
#' @rdname to_utc_timestamp
12621273
#' @export
@@ -1397,6 +1408,10 @@ setGeneric("spark.randomForest",
13971408
#' @export
13981409
setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") })
13991410

1411+
#' @rdname spark.svmLinear
1412+
#' @export
1413+
setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") })
1414+
14001415
#' @rdname spark.lda
14011416
#' @export
14021417
setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") })

R/pkg/R/install.R

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
#' Download and Install Apache Spark to a Local Directory
2222
#'
2323
#' \code{install.spark} downloads and installs Spark to a local directory if
24-
#' it is not found. The Spark version we use is the same as the SparkR version.
25-
#' Users can specify a desired Hadoop version, the remote mirror site, and
26-
#' the directory where the package is installed locally.
24+
#' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is
25+
#' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired
26+
#' Hadoop version, the remote mirror site, and the directory where the package is installed locally.
2727
#'
2828
#' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
2929
#' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
@@ -68,6 +68,16 @@
6868
#' \href{http://spark.apache.org/downloads.html}{Apache Spark}
6969
install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
7070
localDir = NULL, overwrite = FALSE) {
71+
sparkHome <- Sys.getenv("SPARK_HOME")
72+
if (isSparkRShell()) {
73+
stopifnot(nchar(sparkHome) > 0)
74+
message("Spark is already running in sparkR shell.")
75+
return(invisible(sparkHome))
76+
} else if (!is.na(file.info(sparkHome)$isdir)) {
77+
message("Spark package found in SPARK_HOME: ", sparkHome)
78+
return(invisible(sparkHome))
79+
}
80+
7181
version <- paste0("spark-", packageVersion("SparkR"))
7282
hadoopVersion <- tolower(hadoopVersion)
7383
hadoopVersionName <- hadoopVersionName(hadoopVersion)

0 commit comments

Comments
 (0)