Skip to content

Commit 1d7242b

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-20648
2 parents a22c458 + 6ae1271 commit 1d7242b

File tree

58 files changed

+1479
-1224
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1479
-1224
lines changed

R/pkg/R/DataFrame.R

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,23 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
5858
#' Set options/mode and then return the write object
5959
#' @noRd
6060
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
61-
options <- varargsToStrEnv(...)
62-
if (!is.null(path)) {
63-
options[["path"]] <- path
64-
}
65-
jmode <- convertToJSaveMode(mode)
66-
write <- callJMethod(write, "mode", jmode)
67-
write <- callJMethod(write, "options", options)
68-
write
61+
options <- varargsToStrEnv(...)
62+
if (!is.null(path)) {
63+
options[["path"]] <- path
64+
}
65+
write <- setWriteMode(write, mode)
66+
write <- callJMethod(write, "options", options)
67+
write
68+
}
69+
70+
#' Set mode and then return the write object
71+
#' @noRd
72+
setWriteMode <- function(write, mode) {
73+
if (!is.character(mode)) {
74+
stop("mode should be character or omitted. It is 'error' by default.")
75+
}
76+
write <- handledCallJMethod(write, "mode", mode)
77+
write
6978
}
7079

7180
#' @export
@@ -556,9 +565,8 @@ setMethod("registerTempTable",
556565
setMethod("insertInto",
557566
signature(x = "SparkDataFrame", tableName = "character"),
558567
function(x, tableName, overwrite = FALSE) {
559-
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
560568
write <- callJMethod(x@sdf, "write")
561-
write <- callJMethod(write, "mode", jmode)
569+
write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append"))
562570
invisible(callJMethod(write, "insertInto", tableName))
563571
})
564572

@@ -810,7 +818,8 @@ setMethod("toJSON",
810818
#'
811819
#' @param x A SparkDataFrame
812820
#' @param path The directory where the file is saved
813-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
821+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
822+
#' save mode (it is 'error' by default)
814823
#' @param ... additional argument(s) passed to the method.
815824
#'
816825
#' @family SparkDataFrame functions
@@ -841,7 +850,8 @@ setMethod("write.json",
841850
#'
842851
#' @param x A SparkDataFrame
843852
#' @param path The directory where the file is saved
844-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
853+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
854+
#' save mode (it is 'error' by default)
845855
#' @param ... additional argument(s) passed to the method.
846856
#'
847857
#' @family SparkDataFrame functions
@@ -872,7 +882,8 @@ setMethod("write.orc",
872882
#'
873883
#' @param x A SparkDataFrame
874884
#' @param path The directory where the file is saved
875-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
885+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
886+
#' save mode (it is 'error' by default)
876887
#' @param ... additional argument(s) passed to the method.
877888
#'
878889
#' @family SparkDataFrame functions
@@ -917,7 +928,8 @@ setMethod("saveAsParquetFile",
917928
#'
918929
#' @param x A SparkDataFrame
919930
#' @param path The directory where the file is saved
920-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
931+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
932+
#' save mode (it is 'error' by default)
921933
#' @param ... additional argument(s) passed to the method.
922934
#'
923935
#' @family SparkDataFrame functions
@@ -2871,18 +2883,19 @@ setMethod("except",
28712883
#' Additionally, mode is used to specify the behavior of the save operation when data already
28722884
#' exists in the data source. There are four modes:
28732885
#' \itemize{
2874-
#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data.
2875-
#' \item overwrite: Existing data is expected to be overwritten by the contents of this
2886+
#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
2887+
#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
28762888
#' SparkDataFrame.
2877-
#' \item error: An exception is expected to be thrown.
2878-
#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
2889+
#' \item 'error' or 'errorifexists': An exception is expected to be thrown.
2890+
#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
28792891
#' and to not change the existing data.
28802892
#' }
28812893
#'
28822894
#' @param df a SparkDataFrame.
28832895
#' @param path a name for the table.
28842896
#' @param source a name for external data source.
2885-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
2897+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
2898+
#' save mode (it is 'error' by default)
28862899
#' @param ... additional argument(s) passed to the method.
28872900
#'
28882901
#' @family SparkDataFrame functions
@@ -2940,17 +2953,18 @@ setMethod("saveDF",
29402953
#'
29412954
#' Additionally, mode is used to specify the behavior of the save operation when
29422955
#' data already exists in the data source. There are four modes: \cr
2943-
#' append: Contents of this SparkDataFrame are expected to be appended to existing data. \cr
2944-
#' overwrite: Existing data is expected to be overwritten by the contents of this
2956+
#' 'append': Contents of this SparkDataFrame are expected to be appended to existing data. \cr
2957+
#' 'overwrite': Existing data is expected to be overwritten by the contents of this
29452958
#' SparkDataFrame. \cr
2946-
#' error: An exception is expected to be thrown. \cr
2947-
#' ignore: The save operation is expected to not save the contents of the SparkDataFrame
2959+
#' 'error' or 'errorifexists': An exception is expected to be thrown. \cr
2960+
#' 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
29482961
#' and to not change the existing data. \cr
29492962
#'
29502963
#' @param df a SparkDataFrame.
29512964
#' @param tableName a name for the table.
29522965
#' @param source a name for external data source.
2953-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default).
2966+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
2967+
#' save mode (it is 'error' by default)
29542968
#' @param ... additional option(s) passed to the method.
29552969
#'
29562970
#' @family SparkDataFrame functions
@@ -2972,12 +2986,11 @@ setMethod("saveAsTable",
29722986
if (is.null(source)) {
29732987
source <- getDefaultSqlSource()
29742988
}
2975-
jmode <- convertToJSaveMode(mode)
29762989
options <- varargsToStrEnv(...)
29772990

29782991
write <- callJMethod(df@sdf, "write")
29792992
write <- callJMethod(write, "format", source)
2980-
write <- callJMethod(write, "mode", jmode)
2993+
write <- setWriteMode(write, mode)
29812994
write <- callJMethod(write, "options", options)
29822995
invisible(callJMethod(write, "saveAsTable", tableName))
29832996
})
@@ -3544,18 +3557,19 @@ setMethod("histogram",
35443557
#' Also, mode is used to specify the behavior of the save operation when
35453558
#' data already exists in the data source. There are four modes:
35463559
#' \itemize{
3547-
#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data.
3548-
#' \item overwrite: Existing data is expected to be overwritten by the contents of this
3560+
#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
3561+
#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
35493562
#' SparkDataFrame.
3550-
#' \item error: An exception is expected to be thrown.
3551-
#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
3563+
#' \item 'error' or 'errorifexists': An exception is expected to be thrown.
3564+
#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
35523565
#' and to not change the existing data.
35533566
#' }
35543567
#'
35553568
#' @param x a SparkDataFrame.
35563569
#' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}.
35573570
#' @param tableName yhe name of the table in the external database.
3558-
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default).
3571+
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3572+
#' save mode (it is 'error' by default)
35593573
#' @param ... additional JDBC database connection properties.
35603574
#' @family SparkDataFrame functions
35613575
#' @rdname write.jdbc
@@ -3572,10 +3586,9 @@ setMethod("histogram",
35723586
setMethod("write.jdbc",
35733587
signature(x = "SparkDataFrame", url = "character", tableName = "character"),
35743588
function(x, url, tableName, mode = "error", ...) {
3575-
jmode <- convertToJSaveMode(mode)
35763589
jprops <- varargsToJProperties(...)
35773590
write <- callJMethod(x@sdf, "write")
3578-
write <- callJMethod(write, "mode", jmode)
3591+
write <- setWriteMode(write, mode)
35793592
invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
35803593
})
35813594

R/pkg/R/utils.R

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -736,15 +736,6 @@ splitString <- function(input) {
736736
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
737737
}
738738

739-
convertToJSaveMode <- function(mode) {
740-
allModes <- c("append", "overwrite", "error", "ignore")
741-
if (!(mode %in% allModes)) {
742-
stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint
743-
}
744-
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
745-
jmode
746-
}
747-
748739
varargsToJProperties <- function(...) {
749740
pairs <- list(...)
750741
props <- newJObject("java.util.Properties")

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,10 @@ test_that("read/write json files", {
630630
jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".json")
631631
write.df(df, jsonPath2, "json", mode = "overwrite")
632632

633+
# Test errorifexists
634+
expect_error(write.df(df, jsonPath2, "json", mode = "errorifexists"),
635+
"analysis error - path file:.*already exists")
636+
633637
# Test write.json
634638
jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
635639
write.json(df, jsonPath3)
@@ -1371,6 +1375,9 @@ test_that("test HiveContext", {
13711375
expect_equal(count(df5), 3)
13721376
unlink(parquetDataPath)
13731377

1378+
# Invalid mode
1379+
expect_error(saveAsTable(df, "parquetest", "parquet", mode = "abc", path = parquetDataPath),
1380+
"illegal argument - Unknown save mode: abc")
13741381
unsetHiveContext()
13751382
}
13761383
})
@@ -3303,6 +3310,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
33033310
"Error in orc : analysis error - path file:.*already exists")
33043311
expect_error(write.parquet(df, jsonPath),
33053312
"Error in parquet : analysis error - path file:.*already exists")
3313+
expect_error(write.parquet(df, jsonPath, mode = 123), "mode should be character or omitted.")
33063314

33073315
# Arguments checking in R side.
33083316
expect_error(write.df(df, "data.tmp", source = c(1, 2)),

R/pkg/tests/fulltests/test_utils.R

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,6 @@ test_that("varargsToJProperties", {
158158
expect_equal(callJMethod(jprops, "size"), 0L)
159159
})
160160

161-
test_that("convertToJSaveMode", {
162-
s <- convertToJSaveMode("error")
163-
expect_true(class(s) == "jobj")
164-
expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
165-
expect_error(convertToJSaveMode("foo"),
166-
'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
167-
})
168-
169161
test_that("captureJVMException", {
170162
method <- "createStructField"
171163
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,

bin/load-spark-env.cmd

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@ rem
1919

2020
rem This script loads spark-env.cmd if it exists, and ensures it is only loaded once.
2121
rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's
22-
rem conf/ subdirectory.
22+
rem conf\ subdirectory.
2323

2424
if [%SPARK_ENV_LOADED%] == [] (
2525
set SPARK_ENV_LOADED=1
2626

27-
if not [%SPARK_CONF_DIR%] == [] (
28-
set user_conf_dir=%SPARK_CONF_DIR%
29-
) else (
30-
set user_conf_dir=..\conf
27+
if [%SPARK_CONF_DIR%] == [] (
28+
set SPARK_CONF_DIR=%~dp0..\conf
3129
)
3230

3331
call :LoadSparkEnv
@@ -54,6 +52,6 @@ if [%SPARK_SCALA_VERSION%] == [] (
5452
exit /b 0
5553

5654
:LoadSparkEnv
57-
if exist "%user_conf_dir%\spark-env.cmd" (
58-
call "%user_conf_dir%\spark-env.cmd"
55+
if exist "%SPARK_CONF_DIR%\spark-env.cmd" (
56+
call "%SPARK_CONF_DIR%\spark-env.cmd"
5957
)

bin/load-spark-env.sh

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,12 @@ fi
2929
if [ -z "$SPARK_ENV_LOADED" ]; then
3030
export SPARK_ENV_LOADED=1
3131

32-
# Returns the parent of the directory this script lives in.
33-
parent_dir="${SPARK_HOME}"
32+
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
3433

35-
user_conf_dir="${SPARK_CONF_DIR:-"$parent_dir"/conf}"
36-
37-
if [ -f "${user_conf_dir}/spark-env.sh" ]; then
34+
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
3835
# Promote all variable declarations to environment (exported) variables
3936
set -a
40-
. "${user_conf_dir}/spark-env.sh"
37+
. "${SPARK_CONF_DIR}/spark-env.sh"
4138
set +a
4239
fi
4340
fi

conf/spark-env.sh.template

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
3333
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
3434

35-
# Options read in YARN client mode
35+
# Options read in YARN client/cluster mode
36+
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
3637
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
3738
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
3839
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -449,11 +449,7 @@ class SparkContext(config: SparkConf) extends Logging {
449449

450450
_ui =
451451
if (conf.getBoolean("spark.ui.enabled", true)) {
452-
Some(SparkUI.create(Some(this), _statusStore, _conf,
453-
l => listenerBus.addToStatusQueue(l),
454-
_env.securityManager,
455-
appName,
456-
"",
452+
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
457453
startTime))
458454
} else {
459455
// For tests, do not enable the UI

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,13 +325,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
325325
}
326326

327327
val loadedUI = {
328-
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf,
329-
l => replayBus.addListener(l),
330-
secManager,
331-
app.info.name,
328+
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
332329
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
333330
attempt.info.startTime.getTime(),
334-
appSparkVersion = attempt.info.appSparkVersion)
331+
attempt.info.appSparkVersion)
335332
LoadedAppUI(ui)
336333
}
337334

0 commit comments

Comments
 (0)