From c818556f2023f6f07d824897106e5db26bbd9066 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 21 Jun 2015 17:25:35 -0700 Subject: [PATCH 1/6] Add pakages to R --- R/pkg/R/client.R | 8 ++++++-- R/pkg/R/sparkR.R | 7 +++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 1281c41213e3..53d2da1b11e0 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -34,7 +34,7 @@ connectBackend <- function(hostname, port, timeout = 6000) { con } -launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) { +launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) { if (.Platform$OS.type == "unix") { sparkSubmitBinName = "spark-submit" } else { @@ -51,7 +51,11 @@ launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) { jars <- paste("--jars", jars) } - combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ") + if (packages != "") { + packages <- paste("--packages", packages) + } + + combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ") cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n") invisible(system2(sparkSubmitBin, combinedArgs, wait = F)) } diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 5ced7c688f98..f2fa3f44c1f4 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -81,6 +81,7 @@ sparkR.stop <- function() { #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors. #' @param sparkJars Character string vector of jar files to pass to the worker nodes. #' @param sparkRLibDir The path where R is installed on the worker nodes. +#' @param sparkPackages Character string vector of packages #' @export #' @examples #'\dontrun{ @@ -100,7 +101,8 @@ sparkR.init <- function( sparkEnvir = list(), sparkExecutorEnv = list(), sparkJars = "", - sparkRLibDir = "") { + sparkRLibDir = "", + sparkPackages="") { if (exists(".sparkRjsc", envir = .sparkREnv)) { cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n") @@ -129,7 +131,8 @@ sparkR.init <- function( args = path, sparkHome = sparkHome, jars = jars, - sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell")) + sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"), + sparkPackages = sparkPackages) # wait atmost 100 seconds for JVM to launch wait <- 0.1 for (i in 1:25) { From c1a9233bfb67a0425964bd35e3e6a12c83e86cb3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 22 Jun 2015 23:18:40 -0700 Subject: [PATCH 2/6] refactor for testing --- R/pkg/R/client.R | 24 ++++++++++++++++-------- R/pkg/R/sparkR.R | 4 ++-- R/pkg/inst/tests/test_client.R | 30 ++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 10 deletions(-) create mode 100644 R/pkg/inst/tests/test_client.R diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R index 53d2da1b11e0..cf2e5ddeb7a9 100644 --- a/R/pkg/R/client.R +++ b/R/pkg/R/client.R @@ -34,28 +34,36 @@ connectBackend <- function(hostname, port, timeout = 6000) { con } -launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) { +determineSparkSubmitBin <- function() { if (.Platform$OS.type == "unix") { sparkSubmitBinName = "spark-submit" } else { sparkSubmitBinName = "spark-submit.cmd" } + sparkSubmitBinName +} - if (sparkHome != "") { - sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName) - } else { - sparkSubmitBin <- sparkSubmitBinName - } - +generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) { if (jars != "") { jars <- paste("--jars", jars) } if (packages != "") { - packages <- paste("--packages", packages) + packages <- paste("--packages", packages) } combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ") + combinedArgs +} + +launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) { + sparkSubmitBin <- determineSparkSubmitBin() + if (sparkHome != "") { + sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName) + } else { + sparkSubmitBin <- sparkSubmitBinName + } + combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, sparkSubmitOpts, packages) cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n") invisible(system2(sparkSubmitBin, combinedArgs, wait = F)) } diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index f2fa3f44c1f4..243f51b584c4 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -81,7 +81,7 @@ sparkR.stop <- function() { #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors. #' @param sparkJars Character string vector of jar files to pass to the worker nodes. #' @param sparkRLibDir The path where R is installed on the worker nodes. -#' @param sparkPackages Character string vector of packages +#' @param sparkPackages Character string vector of packages from spark-packages.org #' @export #' @examples #'\dontrun{ @@ -102,7 +102,7 @@ sparkR.init <- function( sparkExecutorEnv = list(), sparkJars = "", sparkRLibDir = "", - sparkPackages="") { + sparkPackages = "") { if (exists(".sparkRjsc", envir = .sparkREnv)) { cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n") diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R new file mode 100644 index 000000000000..03ae73a7e550 --- /dev/null +++ b/R/pkg/inst/tests/test_client.R @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions in client.R") + +test_that("adding spark-testing-base as a package works", { + args <- generateSparkSubmitArgs("", "", "", "", + "holdenk:spark-testing-base:1.3.0_0.0.5") + expect_equal(args, + "--packages holdenk:spark-testing-base:1.3.0_0.0.5") +}) + +test_that("no package specified doesn't add packages flag", { + args <- generateSparkSubmitArgs("", "", "", "", "") + expect_equal(args, "") +}) From c7a4471a5d493a897af9c94c8e7f678a26240eb9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 22 Jun 2015 23:27:08 -0700 Subject: [PATCH 3/6] Add some documentation --- docs/sparkr.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 4d82129921a3..cd7f3572b59d 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -27,9 +27,9 @@ All of the examples on this page use sample data included in R or the Spark dist
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster. You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name -etc. Further, to work with DataFrames we will need a `SQLContext`, which can be created from the -SparkContext. If you are working from the SparkR shell, the `SQLContext` and `SparkContext` should -already be created for you. +, any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`, +which can be created from the SparkContext. If you are working from the SparkR shell, the +`SQLContext` and `SparkContext` should already be created for you. {% highlight r %} sc <- sparkR.init() @@ -62,7 +62,9 @@ head(df) SparkR supports operating on a variety of data sources through the `DataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. -The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro). +The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro). These packages can either be added by +specifying `--packages` with `sparm-submit` or `sparkR` commands, or if creating context through `init` +you can specify the packages with the `packages` argument. We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. From 865a90cacdda09dcd9361fba10c2092172dce3a1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 23 Jun 2015 00:09:46 -0700 Subject: [PATCH 4/6] strip spaces for comparision --- R/pkg/inst/tests/test_client.R | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R index 03ae73a7e550..30b05c1a2afc 100644 --- a/R/pkg/inst/tests/test_client.R +++ b/R/pkg/inst/tests/test_client.R @@ -20,11 +20,13 @@ context("functions in client.R") test_that("adding spark-testing-base as a package works", { args <- generateSparkSubmitArgs("", "", "", "", "holdenk:spark-testing-base:1.3.0_0.0.5") - expect_equal(args, - "--packages holdenk:spark-testing-base:1.3.0_0.0.5") + expect_equal(gsub("[[:space:]]", "", args), + gsub("[[:space:]]", "", + "--packages holdenk:spark-testing-base:1.3.0_0.0.5")) }) test_that("no package specified doesn't add packages flag", { args <- generateSparkSubmitArgs("", "", "", "", "") - expect_equal(args, "") + expect_equal(gsub("[[:space:]]", "", args), + "") }) From fa8bc929ea0f1c24f3754fbaa9ddb47ac89e458f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 24 Jun 2015 10:59:20 -0700 Subject: [PATCH 5/6] typo: sparm -> spark --- docs/sparkr.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index cd7f3572b59d..3501d4589f5e 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -63,7 +63,7 @@ head(df) SparkR supports operating on a variety of data sources through the `DataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro). These packages can either be added by -specifying `--packages` with `sparm-submit` or `sparkR` commands, or if creating context through `init` +specifying `--packages` with `spark-submit` or `sparkR` commands, or if creating context through `init` you can specify the packages with the `packages` argument. We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. From b60dd63de58141e2a0e4c0570c57635d6f20c647 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 24 Jun 2015 11:31:18 -0700 Subject: [PATCH 6/6] Add an example with the spark-csv package --- docs/sparkr.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/sparkr.md b/docs/sparkr.md index 3501d4589f5e..095ea4308cfe 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -66,6 +66,13 @@ The general method for creating DataFrames from data sources is `read.df`. This specifying `--packages` with `spark-submit` or `sparkR` commands, or if creating context through `init` you can specify the packages with the `packages` argument. +
+{% highlight r %} +sc <- sparkR.init(packages="com.databricks:spark-csv_2.11:1.0.3") +sqlContext <- sparkRSQL.init(sc) +{% endhighlight %} +
+ We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.