Skip to content

Commit bb5a2af

Browse files
felixcheungshivaram
authored andcommitted
[SPARK-11340][SPARKR] Support setting driver properties when starting Spark from R programmatically or from RStudio
Mapping spark.driver.memory from sparkEnvir to spark-submit commandline arguments. shivaram suggested that we possibly add other spark.driver.* properties - do we want to add all of those? I thought those could be set in SparkConf? sun-rui Author: felixcheung <[email protected]> Closes #9290 from felixcheung/rdrivermem.
1 parent 729f983 commit bb5a2af

File tree

3 files changed

+87
-13
lines changed

3 files changed

+87
-13
lines changed

R/pkg/R/sparkR.R

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ sparkR.stop <- function() {
7777

7878
#' Initialize a new Spark Context.
7979
#'
80-
#' This function initializes a new SparkContext.
80+
#' This function initializes a new SparkContext. For details on how to initialize
81+
#' and use SparkR, refer to SparkR programming guide at
82+
#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
8183
#'
8284
#' @param master The Spark master URL.
8385
#' @param appName Application name to register with cluster manager
@@ -93,7 +95,7 @@ sparkR.stop <- function() {
9395
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
9496
#' list(spark.executor.memory="1g"))
9597
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
96-
#' list(spark.executor.memory="1g"),
98+
#' list(spark.executor.memory="4g"),
9799
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
98100
#' c("jarfile1.jar","jarfile2.jar"))
99101
#'}
@@ -123,16 +125,21 @@ sparkR.init <- function(
123125
uriSep <- "////"
124126
}
125127

128+
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
129+
126130
existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
127131
if (existingPort != "") {
128132
backendPort <- existingPort
129133
} else {
130134
path <- tempfile(pattern = "backend_port")
135+
submitOps <- getClientModeSparkSubmitOpts(
136+
Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
137+
sparkEnvirMap)
131138
launchBackend(
132139
args = path,
133140
sparkHome = sparkHome,
134141
jars = jars,
135-
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
142+
sparkSubmitOpts = submitOps,
136143
packages = sparkPackages)
137144
# wait atmost 100 seconds for JVM to launch
138145
wait <- 0.1
@@ -171,8 +178,6 @@ sparkR.init <- function(
171178
sparkHome <- suppressWarnings(normalizePath(sparkHome))
172179
}
173180

174-
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
175-
176181
sparkExecutorEnvMap <- convertNamedListToEnv(sparkExecutorEnv)
177182
if(is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
178183
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
@@ -320,3 +325,33 @@ clearJobGroup <- function(sc) {
320325
cancelJobGroup <- function(sc, groupId) {
321326
callJMethod(sc, "cancelJobGroup", groupId)
322327
}
328+
329+
sparkConfToSubmitOps <- new.env()
330+
sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory"
331+
sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path"
332+
sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options"
333+
sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path"
334+
335+
# Utility function that returns Spark Submit arguments as a string
336+
#
337+
# A few Spark Application and Runtime environment properties cannot take effect after driver
338+
# JVM has started, as documented in:
339+
# http://spark.apache.org/docs/latest/configuration.html#application-properties
340+
# When starting SparkR without using spark-submit, for example, from Rstudio, add them to
341+
# spark-submit commandline if not already set in SPARKR_SUBMIT_ARGS so that they can be effective.
342+
getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
343+
envirToOps <- lapply(ls(sparkConfToSubmitOps), function(conf) {
344+
opsValue <- sparkEnvirMap[[conf]]
345+
# process only if --option is not already specified
346+
if (!is.null(opsValue) &&
347+
nchar(opsValue) > 1 &&
348+
!grepl(sparkConfToSubmitOps[[conf]], submitOps)) {
349+
# put "" around value in case it has spaces
350+
paste0(sparkConfToSubmitOps[[conf]], " \"", opsValue, "\" ")
351+
} else {
352+
""
353+
}
354+
})
355+
# --option must be before the application class "sparkr-shell" in submitOps
356+
paste0(paste0(envirToOps, collapse = ""), submitOps)
357+
}

R/pkg/inst/tests/test_context.R

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,30 @@ test_that("job group functions can be called", {
6565
cancelJobGroup(sc, "groupId")
6666
clearJobGroup(sc)
6767
})
68+
69+
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
70+
e <- new.env()
71+
e[["spark.driver.memory"]] <- "512m"
72+
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
73+
expect_equal("--driver-memory \"512m\" sparkrmain", ops)
74+
75+
e[["spark.driver.memory"]] <- "5g"
76+
e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
77+
e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
78+
e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
79+
e[["random"]] <- "skipthis"
80+
ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
81+
# nolint start
82+
expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
83+
"-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
84+
"/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
85+
# nolint end
86+
87+
e[["spark.driver.extraClassPath"]] <- "/" # too short
88+
ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
89+
# nolint start
90+
expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
91+
"-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
92+
" --driver-memory 4g sparkr-shell2"))
93+
# nolint end
94+
})

docs/sparkr.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,37 @@ All of the examples on this page use sample data included in R or the Spark dist
2929
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster.
3030
You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name
3131
, any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`,
32-
which can be created from the SparkContext. If you are working from the SparkR shell, the
32+
which can be created from the SparkContext. If you are working from the `sparkR` shell, the
3333
`SQLContext` and `SparkContext` should already be created for you.
3434

3535
{% highlight r %}
3636
sc <- sparkR.init()
3737
sqlContext <- sparkRSQL.init(sc)
3838
{% endhighlight %}
3939

40+
In the event you are creating `SparkContext` instead of using `sparkR` shell or `spark-submit`, you
41+
could also specify certain Spark driver properties. Normally these
42+
[Application properties](configuration.html#application-properties) and
43+
[Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the
44+
driver JVM process would have been started, in this case SparkR takes care of this for you. To set
45+
them, pass them as you would other configuration properties in the `sparkEnvir` argument to
46+
`sparkR.init()`.
47+
48+
{% highlight r %}
49+
sc <- sparkR.init("local[*]", "SparkR", "/home/spark", list(spark.driver.memory="2g"))
50+
{% endhighlight %}
51+
4052
</div>
4153

4254
## Creating DataFrames
4355
With a `SQLContext`, applications can create `DataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources).
4456

4557
### From local data frames
46-
The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R.
58+
The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R.
4759

4860
<div data-lang="r" markdown="1">
4961
{% highlight r %}
50-
df <- createDataFrame(sqlContext, faithful)
62+
df <- createDataFrame(sqlContext, faithful)
5163

5264
# Displays the content of the DataFrame to stdout
5365
head(df)
@@ -96,7 +108,7 @@ printSchema(people)
96108
</div>
97109

98110
The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example
99-
to a Parquet file using `write.df`
111+
to a Parquet file using `write.df`
100112

101113
<div data-lang="r" markdown="1">
102114
{% highlight r %}
@@ -139,7 +151,7 @@ Here we include some basic examples and a complete list can be found in the [API
139151
<div data-lang="r" markdown="1">
140152
{% highlight r %}
141153
# Create the DataFrame
142-
df <- createDataFrame(sqlContext, faithful)
154+
df <- createDataFrame(sqlContext, faithful)
143155

144156
# Get basic information about the DataFrame
145157
df
@@ -152,7 +164,7 @@ head(select(df, df$eruptions))
152164
##2 1.800
153165
##3 3.333
154166

155-
# You can also pass in column name as strings
167+
# You can also pass in column name as strings
156168
head(select(df, "eruptions"))
157169

158170
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
@@ -166,7 +178,7 @@ head(filter(df, df$waiting < 50))
166178

167179
</div>
168180

169-
### Grouping, Aggregation
181+
### Grouping, Aggregation
170182

171183
SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below
172184

@@ -194,7 +206,7 @@ head(arrange(waiting_counts, desc(waiting_counts$count)))
194206

195207
### Operating on Columns
196208

197-
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
209+
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
198210

199211
<div data-lang="r" markdown="1">
200212
{% highlight r %}

0 commit comments

Comments
 (0)