Skip to content

Commit 214ba66

Browse files
junyangqshivaram
authored andcommitted
[SPARK-16579][SPARKR] add install.spark function
## What changes were proposed in this pull request? Add an install_spark function to the SparkR package. User can run `install_spark()` to install Spark to a local directory within R. Updates: Several changes have been made: - `install.spark()` - check existence of tar file in the cache folder, and download only if not found - trial priority of mirror_url look-up: user-provided -> preferred mirror site from apache website -> hardcoded backup option - use 2.0.0 - `sparkR.session()` - can install spark when not found in `SPARK_HOME` ## How was this patch tested? Manual tests, running the check-cran.sh script added in #14173. Author: Junyang Qian <[email protected]> Closes #14258 from junyangq/SPARK-16579.
1 parent d4a9122 commit 214ba66

File tree

7 files changed

+267
-4
lines changed

7 files changed

+267
-4
lines changed

R/check-cran.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ $FWDIR/create-docs.sh
4747

4848
VERSION=`grep Version $FWDIR/pkg/DESCRIPTION | awk '{print $NF}'`
4949

50-
"$R_SCRIPT_PATH/"R CMD check --as-cran --no-tests SparkR_"$VERSION".tar.gz
50+
"$R_SCRIPT_PATH/"R CMD check --as-cran SparkR_"$VERSION".tar.gz
5151

5252
popd > /dev/null

R/pkg/DESCRIPTION

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Author: The Apache Software Foundation
77
Maintainer: Shivaram Venkataraman <[email protected]>
88
Depends:
99
R (>= 3.0),
10-
methods,
10+
methods
1111
Suggests:
1212
testthat,
1313
e1071,
@@ -31,6 +31,7 @@ Collate:
3131
'context.R'
3232
'deserialize.R'
3333
'functions.R'
34+
'install.R'
3435
'mllib.R'
3536
'serialize.R'
3637
'sparkR.R'

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,5 @@ S3method(structField, character)
352352
S3method(structField, jobj)
353353
S3method(structType, jobj)
354354
S3method(structType, structField)
355+
356+
export("install.spark")

R/pkg/R/install.R

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Functions to install Spark in case the user directly downloads SparkR
19+
# from CRAN.
20+
21+
#' Download and Install Apache Spark to a Local Directory
22+
#'
23+
#' \code{install.spark} downloads and installs Spark to a local directory if
24+
#' it is not found. The Spark version we use is the same as the SparkR version.
25+
#' Users can specify a desired Hadoop version, the remote mirror site, and
26+
#' the directory where the package is installed locally.
27+
#'
28+
#' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
29+
#' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
30+
#' named after the Spark version (that corresponds to SparkR), and then the tar filename.
31+
#' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz.
32+
#' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from
33+
#' \code{http://apache.osuosl.org} has path:
34+
#' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}.
35+
#' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then
36+
#' \code{without-hadoop}.
37+
#'
38+
#' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other
39+
#' version number in the format of "x.y" where x and y are integer.
40+
#' If \code{hadoopVersion = "without"}, "Hadoop free" build is installed.
41+
#' See
42+
#' \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{
43+
#' "Hadoop Free" Build} for more information.
44+
#' Other patched version names can also be used, e.g. \code{"cdh4"}
45+
#' @param mirrorUrl base URL of the repositories to use. The directory layout should follow
46+
#' \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
47+
#' @param localDir a local directory where Spark is installed. The directory contains
48+
#' version-specific folders of Spark packages. Default is path to
49+
#' the cache directory:
50+
#' \itemize{
51+
#' \item Mac OS X: \file{~/Library/Caches/spark}
52+
#' \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark}
53+
#' \item Windows: \file{\%LOCALAPPDATA\%\\spark\\spark\\Cache}. See
54+
#' \href{https://www.microsoft.com/security/portal/mmpc/shared/variables.aspx}{
55+
#' Windows Common Folder Variables} about \%LOCALAPPDATA\%
56+
#' }
57+
#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
58+
#' and force re-install Spark (in case the local directory or file is corrupted)
59+
#' @return \code{install.spark} returns the local directory where Spark is found or installed
60+
#' @rdname install.spark
61+
#' @name install.spark
62+
#' @aliases install.spark
63+
#' @export
64+
#' @examples
65+
#'\dontrun{
66+
#' install.spark()
67+
#'}
68+
#' @note install.spark since 2.1.0
69+
#' @seealso See available Hadoop versions:
70+
#' \href{http://spark.apache.org/downloads.html}{Apache Spark}
71+
install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
72+
localDir = NULL, overwrite = FALSE) {
73+
version <- paste0("spark-", packageVersion("SparkR"))
74+
hadoopVersion <- tolower(hadoopVersion)
75+
hadoopVersionName <- hadoop_version_name(hadoopVersion)
76+
packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
77+
localDir <- ifelse(is.null(localDir), spark_cache_path(),
78+
normalizePath(localDir, mustWork = FALSE))
79+
80+
if (is.na(file.info(localDir)$isdir)) {
81+
dir.create(localDir, recursive = TRUE)
82+
}
83+
84+
packageLocalDir <- file.path(localDir, packageName)
85+
86+
if (overwrite) {
87+
message(paste0("Overwrite = TRUE: download and overwrite the tar file",
88+
"and Spark package directory if they exist."))
89+
}
90+
91+
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
92+
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
93+
fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s"
94+
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
95+
packageLocalDir)
96+
message(msg)
97+
Sys.setenv(SPARK_HOME = packageLocalDir)
98+
return(invisible(packageLocalDir))
99+
}
100+
101+
packageLocalPath <- paste0(packageLocalDir, ".tgz")
102+
tarExists <- file.exists(packageLocalPath)
103+
104+
if (tarExists && !overwrite) {
105+
message("tar file found.")
106+
} else {
107+
robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
108+
}
109+
110+
message(sprintf("Installing to %s", localDir))
111+
untar(tarfile = packageLocalPath, exdir = localDir)
112+
if (!tarExists || overwrite) {
113+
unlink(packageLocalPath)
114+
}
115+
message("DONE.")
116+
Sys.setenv(SPARK_HOME = packageLocalDir)
117+
message(paste("SPARK_HOME set to", packageLocalDir))
118+
invisible(packageLocalDir)
119+
}
120+
121+
robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
122+
# step 1: use user-provided url
123+
if (!is.null(mirrorUrl)) {
124+
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
125+
message(msg)
126+
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
127+
packageName, packageLocalPath)
128+
if (success) return()
129+
} else {
130+
message("Mirror site not provided.")
131+
}
132+
133+
# step 2: use url suggested from apache website
134+
message("Looking for site suggested from apache website...")
135+
mirrorUrl <- get_preferred_mirror(version, packageName)
136+
if (!is.null(mirrorUrl)) {
137+
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
138+
packageName, packageLocalPath)
139+
if (success) return()
140+
} else {
141+
message("Unable to find suggested mirror site.")
142+
}
143+
144+
# step 3: use backup option
145+
message("To use backup site...")
146+
mirrorUrl <- default_mirror_url()
147+
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
148+
packageName, packageLocalPath)
149+
if (success) {
150+
return(packageLocalPath)
151+
} else {
152+
msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
153+
"Please check network connection, Hadoop version,",
154+
"or provide other mirror sites."),
155+
version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion))
156+
stop(msg)
157+
}
158+
}
159+
160+
get_preferred_mirror <- function(version, packageName) {
161+
jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
162+
file.path("spark", version, packageName),
163+
".tgz&as_json=1")
164+
textLines <- readLines(jsonUrl, warn = FALSE)
165+
rowNum <- grep("\"preferred\"", textLines)
166+
linePreferred <- textLines[rowNum]
167+
matchInfo <- regexpr("\"[A-Za-z][A-Za-z0-9+-.]*://.+\"", linePreferred)
168+
if (matchInfo != -1) {
169+
startPos <- matchInfo + 1
170+
endPos <- matchInfo + attr(matchInfo, "match.length") - 2
171+
mirrorPreferred <- base::substr(linePreferred, startPos, endPos)
172+
mirrorPreferred <- paste0(mirrorPreferred, "spark")
173+
message(sprintf("Preferred mirror site found: %s", mirrorPreferred))
174+
} else {
175+
mirrorPreferred <- NULL
176+
}
177+
mirrorPreferred
178+
}
179+
180+
direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
181+
packageRemotePath <- paste0(
182+
file.path(mirrorUrl, version, packageName), ".tgz")
183+
fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s")
184+
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
185+
packageRemotePath)
186+
message(msg)
187+
188+
isFail <- tryCatch(download.file(packageRemotePath, packageLocalPath),
189+
error = function(e) {
190+
message(sprintf("Fetch failed from %s", mirrorUrl))
191+
print(e)
192+
TRUE
193+
})
194+
!isFail
195+
}
196+
197+
default_mirror_url <- function() {
198+
"http://www-us.apache.org/dist/spark"
199+
}
200+
201+
hadoop_version_name <- function(hadoopVersion) {
202+
if (hadoopVersion == "without") {
203+
"without-hadoop"
204+
} else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
205+
paste0("hadoop", hadoopVersion)
206+
} else {
207+
hadoopVersion
208+
}
209+
}
210+
211+
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
212+
# adapt to Spark context
213+
spark_cache_path <- function() {
214+
if (.Platform$OS.type == "windows") {
215+
winAppPath <- Sys.getenv("%LOCALAPPDATA%", unset = NA)
216+
if (is.na(winAppPath)) {
217+
msg <- paste("%LOCALAPPDATA% not found.",
218+
"Please define the environment variable",
219+
"or restart and enter an installation path in localDir.")
220+
stop(msg)
221+
} else {
222+
path <- file.path(winAppPath, "spark", "spark", "Cache")
223+
}
224+
} else if (.Platform$OS.type == "unix") {
225+
if (Sys.info()["sysname"] == "Darwin") {
226+
path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark")
227+
} else {
228+
path <- file.path(
229+
Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
230+
}
231+
} else {
232+
stop(sprintf("Unknown OS: %s", .Platform$OS.type))
233+
}
234+
normalizePath(path, mustWork = FALSE)
235+
}

R/pkg/R/sparkR.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,23 @@ sparkR.session <- function(
365365
}
366366
overrideEnvs(sparkConfigMap, paramMap)
367367
}
368+
# do not download if it is run in the sparkR shell
369+
if (!nzchar(master) || is_master_local(master)) {
370+
if (!is_sparkR_shell()) {
371+
if (is.na(file.info(sparkHome)$isdir)) {
372+
msg <- paste0("Spark not found in SPARK_HOME: ",
373+
sparkHome,
374+
" .\nTo search in the cache directory. ",
375+
"Installation will start if not found.")
376+
message(msg)
377+
packageLocalDir <- install.spark()
378+
sparkHome <- packageLocalDir
379+
} else {
380+
msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome)
381+
message(msg)
382+
}
383+
}
384+
}
368385

369386
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
370387
sparkExecutorEnvMap <- new.env()

R/pkg/R/utils.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,3 +689,11 @@ getSparkContext <- function() {
689689
sc <- get(".sparkRjsc", envir = .sparkREnv)
690690
sc
691691
}
692+
693+
is_master_local <- function(master) {
694+
grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
695+
}
696+
697+
is_sparkR_shell <- function() {
698+
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
699+
}

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1824,11 +1824,11 @@ test_that("describe() and summarize() on a DataFrame", {
18241824
expect_equal(collect(stats)[2, "age"], "24.5")
18251825
expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
18261826
stats <- describe(df)
1827-
expect_equal(collect(stats)[4, "name"], "Andy")
1827+
expect_equal(collect(stats)[4, "summary"], "min")
18281828
expect_equal(collect(stats)[5, "age"], "30")
18291829

18301830
stats2 <- summary(df)
1831-
expect_equal(collect(stats2)[4, "name"], "Andy")
1831+
expect_equal(collect(stats2)[4, "summary"], "min")
18321832
expect_equal(collect(stats2)[5, "age"], "30")
18331833

18341834
# SPARK-16425: SparkR summary() fails on column of type logical

0 commit comments

Comments
 (0)