Skip to content

Commit be6d153

Browse files
committed
Merge remote-tracking branch 'upstream/master' into repartition-batch-size
2 parents 60e2abd + 5b77e66 commit be6d153

File tree

263 files changed

+3998
-1951
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

263 files changed

+3998
-1951
lines changed

R/pkg/R/DataFrame.R

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
5555
.Object
5656
})
5757

58+
#' Set options/mode and then return the write object
59+
#' @noRd
60+
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
61+
options <- varargsToStrEnv(...)
62+
if (!is.null(path)) {
63+
options[["path"]] <- path
64+
}
65+
jmode <- convertToJSaveMode(mode)
66+
write <- callJMethod(write, "mode", jmode)
67+
write <- callJMethod(write, "options", options)
68+
write
69+
}
70+
5871
#' @export
5972
#' @param sdf A Java object reference to the backing Scala DataFrame
6073
#' @param isCached TRUE if the SparkDataFrame is cached
@@ -727,6 +740,8 @@ setMethod("toJSON",
727740
#'
728741
#' @param x A SparkDataFrame
729742
#' @param path The directory where the file is saved
743+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
744+
#' @param ... additional argument(s) passed to the method.
730745
#'
731746
#' @family SparkDataFrame functions
732747
#' @rdname write.json
@@ -743,8 +758,9 @@ setMethod("toJSON",
743758
#' @note write.json since 1.6.0
744759
setMethod("write.json",
745760
signature(x = "SparkDataFrame", path = "character"),
746-
function(x, path) {
761+
function(x, path, mode = "error", ...) {
747762
write <- callJMethod(x@sdf, "write")
763+
write <- setWriteOptions(write, mode = mode, ...)
748764
invisible(callJMethod(write, "json", path))
749765
})
750766

@@ -755,6 +771,8 @@ setMethod("write.json",
755771
#'
756772
#' @param x A SparkDataFrame
757773
#' @param path The directory where the file is saved
774+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
775+
#' @param ... additional argument(s) passed to the method.
758776
#'
759777
#' @family SparkDataFrame functions
760778
#' @aliases write.orc,SparkDataFrame,character-method
@@ -771,8 +789,9 @@ setMethod("write.json",
771789
#' @note write.orc since 2.0.0
772790
setMethod("write.orc",
773791
signature(x = "SparkDataFrame", path = "character"),
774-
function(x, path) {
792+
function(x, path, mode = "error", ...) {
775793
write <- callJMethod(x@sdf, "write")
794+
write <- setWriteOptions(write, mode = mode, ...)
776795
invisible(callJMethod(write, "orc", path))
777796
})
778797

@@ -783,6 +802,8 @@ setMethod("write.orc",
783802
#'
784803
#' @param x A SparkDataFrame
785804
#' @param path The directory where the file is saved
805+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
806+
#' @param ... additional argument(s) passed to the method.
786807
#'
787808
#' @family SparkDataFrame functions
788809
#' @rdname write.parquet
@@ -800,8 +821,9 @@ setMethod("write.orc",
800821
#' @note write.parquet since 1.6.0
801822
setMethod("write.parquet",
802823
signature(x = "SparkDataFrame", path = "character"),
803-
function(x, path) {
824+
function(x, path, mode = "error", ...) {
804825
write <- callJMethod(x@sdf, "write")
826+
write <- setWriteOptions(write, mode = mode, ...)
805827
invisible(callJMethod(write, "parquet", path))
806828
})
807829

@@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
825847
#'
826848
#' @param x A SparkDataFrame
827849
#' @param path The directory where the file is saved
850+
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
851+
#' @param ... additional argument(s) passed to the method.
828852
#'
829853
#' @family SparkDataFrame functions
830854
#' @aliases write.text,SparkDataFrame,character-method
@@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
841865
#' @note write.text since 2.0.0
842866
setMethod("write.text",
843867
signature(x = "SparkDataFrame", path = "character"),
844-
function(x, path) {
868+
function(x, path, mode = "error", ...) {
845869
write <- callJMethod(x@sdf, "write")
870+
write <- setWriteOptions(write, mode = mode, ...)
846871
invisible(callJMethod(write, "text", path))
847872
})
848873

@@ -2637,15 +2662,9 @@ setMethod("write.df",
26372662
if (is.null(source)) {
26382663
source <- getDefaultSqlSource()
26392664
}
2640-
jmode <- convertToJSaveMode(mode)
2641-
options <- varargsToEnv(...)
2642-
if (!is.null(path)) {
2643-
options[["path"]] <- path
2644-
}
26452665
write <- callJMethod(df@sdf, "write")
26462666
write <- callJMethod(write, "format", source)
2647-
write <- callJMethod(write, "mode", jmode)
2648-
write <- callJMethod(write, "options", options)
2667+
write <- setWriteOptions(write, path = path, mode = mode, ...)
26492668
write <- handledCallJMethod(write, "save")
26502669
})
26512670

@@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
27012720
source <- getDefaultSqlSource()
27022721
}
27032722
jmode <- convertToJSaveMode(mode)
2704-
options <- varargsToEnv(...)
2723+
options <- varargsToStrEnv(...)
27052724

27062725
write <- callJMethod(df@sdf, "write")
27072726
write <- callJMethod(write, "format", source)

R/pkg/R/SQLContext.R

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
328328
#' It goes through the entire dataset once to determine the schema.
329329
#'
330330
#' @param path Path of file to read. A vector of multiple paths is allowed.
331+
#' @param ... additional external data source specific named properties.
331332
#' @return SparkDataFrame
332333
#' @rdname read.json
333334
#' @export
@@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
341342
#' @name read.json
342343
#' @method read.json default
343344
#' @note read.json since 1.6.0
344-
read.json.default <- function(path) {
345+
read.json.default <- function(path, ...) {
345346
sparkSession <- getSparkSession()
347+
options <- varargsToStrEnv(...)
346348
# Allow the user to have a more flexible definiton of the text file path
347349
paths <- as.list(suppressWarnings(normalizePath(path)))
348350
read <- callJMethod(sparkSession, "read")
351+
read <- callJMethod(read, "options", options)
349352
sdf <- callJMethod(read, "json", paths)
350353
dataFrame(sdf)
351354
}
@@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
405408
#' Loads an ORC file, returning the result as a SparkDataFrame.
406409
#'
407410
#' @param path Path of file to read.
411+
#' @param ... additional external data source specific named properties.
408412
#' @return SparkDataFrame
409413
#' @rdname read.orc
410414
#' @export
411415
#' @name read.orc
412416
#' @note read.orc since 2.0.0
413-
read.orc <- function(path) {
417+
read.orc <- function(path, ...) {
414418
sparkSession <- getSparkSession()
419+
options <- varargsToStrEnv(...)
415420
# Allow the user to have a more flexible definiton of the ORC file path
416421
path <- suppressWarnings(normalizePath(path))
417422
read <- callJMethod(sparkSession, "read")
423+
read <- callJMethod(read, "options", options)
418424
sdf <- callJMethod(read, "orc", path)
419425
dataFrame(sdf)
420426
}
@@ -430,11 +436,13 @@ read.orc <- function(path) {
430436
#' @name read.parquet
431437
#' @method read.parquet default
432438
#' @note read.parquet since 1.6.0
433-
read.parquet.default <- function(path) {
439+
read.parquet.default <- function(path, ...) {
434440
sparkSession <- getSparkSession()
441+
options <- varargsToStrEnv(...)
435442
# Allow the user to have a more flexible definiton of the Parquet file path
436443
paths <- as.list(suppressWarnings(normalizePath(path)))
437444
read <- callJMethod(sparkSession, "read")
445+
read <- callJMethod(read, "options", options)
438446
sdf <- callJMethod(read, "parquet", paths)
439447
dataFrame(sdf)
440448
}
@@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
467475
#' Each line in the text file is a new row in the resulting SparkDataFrame.
468476
#'
469477
#' @param path Path of file to read. A vector of multiple paths is allowed.
478+
#' @param ... additional external data source specific named properties.
470479
#' @return SparkDataFrame
471480
#' @rdname read.text
472481
#' @export
@@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
479488
#' @name read.text
480489
#' @method read.text default
481490
#' @note read.text since 1.6.1
482-
read.text.default <- function(path) {
491+
read.text.default <- function(path, ...) {
483492
sparkSession <- getSparkSession()
493+
options <- varargsToStrEnv(...)
484494
# Allow the user to have a more flexible definiton of the text file path
485495
paths <- as.list(suppressWarnings(normalizePath(path)))
486496
read <- callJMethod(sparkSession, "read")
497+
read <- callJMethod(read, "options", options)
487498
sdf <- callJMethod(read, "text", paths)
488499
dataFrame(sdf)
489500
}
@@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
779790
"in 'spark.sql.sources.default' configuration by default.")
780791
}
781792
sparkSession <- getSparkSession()
782-
options <- varargsToEnv(...)
793+
options <- varargsToStrEnv(...)
783794
if (!is.null(path)) {
784795
options[["path"]] <- path
785796
}
@@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
842853
#' @note createExternalTable since 1.4.0
843854
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
844855
sparkSession <- getSparkSession()
845-
options <- varargsToEnv(...)
856+
options <- varargsToStrEnv(...)
846857
if (!is.null(path)) {
847858
options[["path"]] <- path
848859
}

R/pkg/R/generics.R

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,23 +651,25 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
651651

652652
#' @rdname write.json
653653
#' @export
654-
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
654+
setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })
655655

656656
#' @rdname write.orc
657657
#' @export
658-
setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
658+
setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })
659659

660660
#' @rdname write.parquet
661661
#' @export
662-
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
662+
setGeneric("write.parquet", function(x, path, ...) {
663+
standardGeneric("write.parquet")
664+
})
663665

664666
#' @rdname write.parquet
665667
#' @export
666668
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
667669

668670
#' @rdname write.text
669671
#' @export
670-
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
672+
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
671673

672674
#' @rdname schema
673675
#' @export

R/pkg/R/utils.R

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
334334
env
335335
}
336336

337+
# Utility function to capture the varargs into environment object but all values are converted
338+
# into string.
339+
varargsToStrEnv <- function(...) {
340+
pairs <- list(...)
341+
env <- new.env()
342+
for (name in names(pairs)) {
343+
value <- pairs[[name]]
344+
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
345+
stop(paste0("Unsupported type for ", name, " : ", class(value),
346+
". Supported types are logical, numeric, character and NULL."))
347+
}
348+
if (is.logical(value)) {
349+
env[[name]] <- tolower(as.character(value))
350+
} else if (is.null(value)) {
351+
env[[name]] <- value
352+
} else {
353+
env[[name]] <- as.character(value)
354+
}
355+
}
356+
env
357+
}
358+
337359
getStorageLevel <- function(newLevel = c("DISK_ONLY",
338360
"DISK_ONLY_2",
339361
"MEMORY_AND_DISK",

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,16 @@ test_that("spark.naiveBayes", {
481481
expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
482482
expect_equal(as.character(predict(m, t1[1, ])), "Yes")
483483
}
484+
485+
# Test numeric response variable
486+
t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
487+
t2 <- t1[-4]
488+
df <- suppressWarnings(createDataFrame(t2))
489+
m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
490+
s <- summary(m)
491+
expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
492+
expect_equal(sum(s$apriori), 1)
493+
expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
484494
})
485495

486496
test_that("spark.survreg", {

0 commit comments

Comments
 (0)