-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[Spark-16579][SparkR] add install.spark function #14258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66cfb6c
9d52d19
89efb04
7ba5213
6203223
98087ad
0db89b7
503cb9f
f4522a6
78d6f91
9666e06
e4fe002
c105d88
d19853a
124110a
03b8320
d727be8
ab3789f
626e4a1
785de93
e3fa259
cf0f66d
4f4a899
14e4943
976472f
8821b56
328408b
0091615
dbabb56
6b2a897
907f37d
fa94e3c
22f2f78
7aa3239
3ad99bc
e421c30
aa4ba4d
2bb00e1
699420d
d58e080
82d24a6
0ebef8a
26d4518
f37a07c
64756de
29bdf30
5decac6
d84ba06
3aeb4eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ Author: The Apache Software Foundation | |
| Maintainer: Shivaram Venkataraman <[email protected]> | ||
| Depends: | ||
| R (>= 3.0), | ||
| methods, | ||
| methods | ||
| Suggests: | ||
| testthat, | ||
| e1071, | ||
|
|
@@ -31,6 +31,7 @@ Collate: | |
| 'context.R' | ||
| 'deserialize.R' | ||
| 'functions.R' | ||
| 'install.R' | ||
| 'mllib.R' | ||
| 'serialize.R' | ||
| 'sparkR.R' | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,235 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| # Functions to install Spark in case the user directly downloads SparkR | ||
| # from CRAN. | ||
|
|
||
| #' Download and Install Apache Spark to a Local Directory | ||
| #' | ||
| #' \code{install.spark} downloads and installs Spark to a local directory if | ||
| #' it is not found. The Spark version we use is the same as the SparkR version. | ||
| #' Users can specify a desired Hadoop version, the remote mirror site, and | ||
| #' the directory where the package is installed locally. | ||
| #' | ||
| #' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}. | ||
| #' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder | ||
| #' named after the Spark version (that corresponds to SparkR), and then the tar filename. | ||
| #' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz. | ||
| #' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from | ||
| #' \code{http://apache.osuosl.org} has path: | ||
| #' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}. | ||
| #' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then | ||
| #' \code{without-hadoop}. | ||
| #' | ||
| #' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other | ||
| #' version number in the format of "x.y" where x and y are integer. | ||
| #' If \code{hadoopVersion = "without"}, "Hadoop free" build is installed. | ||
| #' See | ||
| #' \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{ | ||
| #' "Hadoop Free" Build} for more information. | ||
| #' Other patched version names can also be used, e.g. \code{"cdh4"} | ||
| #' @param mirrorUrl base URL of the repositories to use. The directory layout should follow | ||
| #' \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}. | ||
| #' @param localDir a local directory where Spark is installed. The directory contains | ||
| #' version-specific folders of Spark packages. Default is path to | ||
| #' the cache directory: | ||
| #' \itemize{ | ||
| #' \item Mac OS X: \file{~/Library/Caches/spark} | ||
| #' \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark} | ||
| #' \item Windows: \file{\%LOCALAPPDATA\%\\spark\\spark\\Cache}. See | ||
| #' \href{https://www.microsoft.com/security/portal/mmpc/shared/variables.aspx}{ | ||
| #' Windows Common Folder Variables} about \%LOCALAPPDATA\% | ||
| #' } | ||
| #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir | ||
| #' and force re-install Spark (in case the local directory or file is corrupted) | ||
| #' @return \code{install.spark} returns the local directory where Spark is found or installed | ||
| #' @rdname install.spark | ||
| #' @name install.spark | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add @Aliases
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's suppose to have the parameter types like this:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it was the format for S4 methods? Do we want to make it S4?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd vote to leave this as a S3 method. Also I think the CRAN checks should show the missing aliases if there are any
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's just a normal function for now. @shivaram Did you mean that or the S3 method? Just to clarify...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah you are right - I've missed the fact that's it's not S4.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good, thanks! |
||
| #' @aliases install.spark | ||
| #' @export | ||
| #' @examples | ||
| #'\dontrun{ | ||
| #' install.spark() | ||
| #'} | ||
| #' @note install.spark since 2.1.0 | ||
| #' @seealso See available Hadoop versions: | ||
| #' \href{http://spark.apache.org/downloads.html}{Apache Spark} | ||
| install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, | ||
| localDir = NULL, overwrite = FALSE) { | ||
| version <- paste0("spark-", packageVersion("SparkR")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it mean that we can only publish SparkR with released Spark versions? Then how to make patched releases, say "2.0.0-1"? Can we overwrite an existing release on CRAN? cc: @felixcheung
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's an excellent point. we could have a fourth digital (separate with In that case we could patch the SparkR package and still have it match major/minor/patch with Spark |
||
| hadoopVersion <- tolower(hadoopVersion) | ||
| hadoopVersionName <- hadoop_version_name(hadoopVersion) | ||
| packageName <- paste(version, "bin", hadoopVersionName, sep = "-") | ||
| localDir <- ifelse(is.null(localDir), spark_cache_path(), | ||
| normalizePath(localDir, mustWork = FALSE)) | ||
|
|
||
| if (is.na(file.info(localDir)$isdir)) { | ||
| dir.create(localDir, recursive = TRUE) | ||
| } | ||
|
|
||
| packageLocalDir <- file.path(localDir, packageName) | ||
|
|
||
| if (overwrite) { | ||
| message(paste0("Overwrite = TRUE: download and overwrite the tar file", | ||
| "and Spark package directory if they exist.")) | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove extra empty line |
||
| # can use dir.exists(packageLocalDir) under R 3.2.0 or later | ||
| if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) { | ||
| fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s" | ||
| msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), | ||
| packageLocalDir) | ||
| message(msg) | ||
| Sys.setenv(SPARK_HOME = packageLocalDir) | ||
| return(invisible(packageLocalDir)) | ||
| } | ||
|
|
||
| packageLocalPath <- paste0(packageLocalDir, ".tgz") | ||
| tarExists <- file.exists(packageLocalPath) | ||
|
|
||
| if (tarExists && !overwrite) { | ||
| message("tar file found.") | ||
| } else { | ||
| robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) | ||
| } | ||
|
|
||
| message(sprintf("Installing to %s", localDir)) | ||
| untar(tarfile = packageLocalPath, exdir = localDir) | ||
| if (!tarExists || overwrite) { | ||
| unlink(packageLocalPath) | ||
| } | ||
| message("DONE.") | ||
| Sys.setenv(SPARK_HOME = packageLocalDir) | ||
| message(paste("SPARK_HOME set to", packageLocalDir)) | ||
| invisible(packageLocalDir) | ||
| } | ||
|
|
||
| robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { | ||
| # step 1: use user-provided url | ||
| if (!is.null(mirrorUrl)) { | ||
| msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl) | ||
| message(msg) | ||
| success <- direct_download_tar(mirrorUrl, version, hadoopVersion, | ||
| packageName, packageLocalPath) | ||
| if (success) return() | ||
| } else { | ||
| message("Mirror site not provided.") | ||
| } | ||
|
|
||
| # step 2: use url suggested from apache website | ||
| message("Looking for site suggested from apache website...") | ||
| mirrorUrl <- get_preferred_mirror(version, packageName) | ||
| if (!is.null(mirrorUrl)) { | ||
| success <- direct_download_tar(mirrorUrl, version, hadoopVersion, | ||
| packageName, packageLocalPath) | ||
| if (success) return() | ||
| } else { | ||
| message("Unable to find suggested mirror site.") | ||
| } | ||
|
|
||
| # step 3: use backup option | ||
| message("To use backup site...") | ||
| mirrorUrl <- default_mirror_url() | ||
| success <- direct_download_tar(mirrorUrl, version, hadoopVersion, | ||
| packageName, packageLocalPath) | ||
| if (success) { | ||
| return(packageLocalPath) | ||
| } else { | ||
| msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.", | ||
| "Please check network connection, Hadoop version,", | ||
| "or provide other mirror sites."), | ||
| version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion)) | ||
| stop(msg) | ||
| } | ||
| } | ||
|
|
||
| get_preferred_mirror <- function(version, packageName) { | ||
| jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=", | ||
| file.path("spark", version, packageName), | ||
| ".tgz&as_json=1") | ||
| textLines <- readLines(jsonUrl, warn = FALSE) | ||
| rowNum <- grep("\"preferred\"", textLines) | ||
| linePreferred <- textLines[rowNum] | ||
| matchInfo <- regexpr("\"[A-Za-z][A-Za-z0-9+-.]*://.+\"", linePreferred) | ||
| if (matchInfo != -1) { | ||
| startPos <- matchInfo + 1 | ||
| endPos <- matchInfo + attr(matchInfo, "match.length") - 2 | ||
| mirrorPreferred <- base::substr(linePreferred, startPos, endPos) | ||
| mirrorPreferred <- paste0(mirrorPreferred, "spark") | ||
| message(sprintf("Preferred mirror site found: %s", mirrorPreferred)) | ||
| } else { | ||
| mirrorPreferred <- NULL | ||
| } | ||
| mirrorPreferred | ||
| } | ||
|
|
||
| direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { | ||
| packageRemotePath <- paste0( | ||
| file.path(mirrorUrl, version, packageName), ".tgz") | ||
| fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s") | ||
| msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), | ||
| packageRemotePath) | ||
| message(msg) | ||
|
|
||
| isFail <- tryCatch(download.file(packageRemotePath, packageLocalPath), | ||
| error = function(e) { | ||
| message(sprintf("Fetch failed from %s", mirrorUrl)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might want to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Thanks! |
||
| print(e) | ||
| TRUE | ||
| }) | ||
| !isFail | ||
| } | ||
|
|
||
| default_mirror_url <- function() { | ||
| "http://www-us.apache.org/dist/spark" | ||
| } | ||
|
|
||
| hadoop_version_name <- function(hadoopVersion) { | ||
| if (hadoopVersion == "without") { | ||
| "without-hadoop" | ||
| } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) { | ||
| paste0("hadoop", hadoopVersion) | ||
| } else { | ||
| hadoopVersion | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it fail here? I think we shouldn't let arbitrary string through?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about the name of user patched versions, e.g. cdh4 - wondering if there is a way to tell such names from other invalid ones?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is usually a format? so
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we trying to validate the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I dont think its worth verifying the |
||
| } | ||
| } | ||
|
|
||
| # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and | ||
| # adapt to Spark context | ||
| spark_cache_path <- function() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there references about the implementation here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, those actually refer to the implementation of rappdirs/R/cache.r. Should add reference here. |
||
| if (.Platform$OS.type == "windows") { | ||
| winAppPath <- Sys.getenv("%LOCALAPPDATA%", unset = NA) | ||
| if (is.na(winAppPath)) { | ||
| msg <- paste("%LOCALAPPDATA% not found.", | ||
| "Please define the environment variable", | ||
| "or restart and enter an installation path in localDir.") | ||
| stop(msg) | ||
| } else { | ||
| path <- file.path(winAppPath, "spark", "spark", "Cache") | ||
| } | ||
| } else if (.Platform$OS.type == "unix") { | ||
| if (Sys.info()["sysname"] == "Darwin") { | ||
| path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to check whether the folder exists and create the folder if it is not. I got this error in the first run
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed to |
||
| } else { | ||
| path <- file.path( | ||
| Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark") | ||
| } | ||
| } else { | ||
| stop(sprintf("Unknown OS: %s", .Platform$OS.type)) | ||
| } | ||
| normalizePath(path, mustWork = FALSE) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"are integers"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!