Skip to content

Commit 1c1a608

Browse files
authored
Merge pull request apache-spark-on-k8s#284 from palantir/aash/resync-apache
[NOSQUASH] Resync from Apache
2 parents 65956b7 + 5d9fbec commit 1c1a608

File tree

277 files changed

+8771
-7436
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

277 files changed

+8771
-7436
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ exportMethods("%<=>%",
232232
"date_sub",
233233
"datediff",
234234
"dayofmonth",
235+
"dayofweek",
235236
"dayofyear",
236237
"decode",
237238
"dense_rank",

R/pkg/R/functions.R

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ setMethod("hash",
696696
#'
697697
#' \dontrun{
698698
#' head(select(df, df$time, year(df$time), quarter(df$time), month(df$time),
699-
#' dayofmonth(df$time), dayofyear(df$time), weekofyear(df$time)))
699+
#' dayofmonth(df$time), dayofweek(df$time), dayofyear(df$time), weekofyear(df$time)))
700700
#' head(agg(groupBy(df, year(df$time)), count(df$y), avg(df$y)))
701701
#' head(agg(groupBy(df, month(df$time)), avg(df$y)))}
702702
#' @note dayofmonth since 1.5.0
@@ -707,6 +707,21 @@ setMethod("dayofmonth",
707707
column(jc)
708708
})
709709

710+
#' @details
711+
#' \code{dayofweek}: Extracts the day of the week as an integer from a
712+
#' given date/timestamp/string.
713+
#'
714+
#' @rdname column_datetime_functions
715+
#' @aliases dayofweek dayofweek,Column-method
716+
#' @export
717+
#' @note dayofweek since 2.3.0
718+
setMethod("dayofweek",
719+
signature(x = "Column"),
720+
function(x) {
721+
jc <- callJStatic("org.apache.spark.sql.functions", "dayofweek", x@jc)
722+
column(jc)
723+
})
724+
710725
#' @details
711726
#' \code{dayofyear}: Extracts the day of the year as an integer from a
712727
#' given date/timestamp/string.

R/pkg/R/generics.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,11 @@ setGeneric("date_sub", function(y, x) { standardGeneric("date_sub") })
10481048
#' @name NULL
10491049
setGeneric("dayofmonth", function(x) { standardGeneric("dayofmonth") })
10501050

1051+
#' @rdname column_datetime_functions
1052+
#' @export
1053+
#' @name NULL
1054+
setGeneric("dayofweek", function(x) { standardGeneric("dayofweek") })
1055+
10511056
#' @rdname column_datetime_functions
10521057
#' @export
10531058
#' @name NULL

R/pkg/R/install.R

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
152152
})
153153
if (!tarExists || overwrite || !success) {
154154
unlink(packageLocalPath)
155+
if (success) {
156+
# if tar file was not there before (or it was, but we are told to overwrite it),
157+
# and untar is successful - set a flag that we have downloaded (and untar) Spark package.
158+
assign(".sparkDownloaded", TRUE, envir = .sparkREnv)
159+
}
155160
}
156161
if (!success) stop("Extract archive failed.")
157162
message("DONE.")
@@ -266,6 +271,7 @@ hadoopVersionName <- function(hadoopVersion) {
266271

267272
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
268273
# adapt to Spark context
274+
# see also sparkCacheRelPathLength()
269275
sparkCachePath <- function() {
270276
if (is_windows()) {
271277
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
@@ -282,7 +288,7 @@ sparkCachePath <- function() {
282288
}
283289
} else if (.Platform$OS.type == "unix") {
284290
if (Sys.info()["sysname"] == "Darwin") {
285-
path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark")
291+
path <- file.path(Sys.getenv("HOME"), "Library", "Caches", "spark")
286292
} else {
287293
path <- file.path(
288294
Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
@@ -293,6 +299,16 @@ sparkCachePath <- function() {
293299
normalizePath(path, mustWork = FALSE)
294300
}
295301

302+
# Length of the Spark cache specific relative path segments for each platform
303+
# eg. "Apache\Spark\Cache" is 3 in Windows, or "spark" is 1 in unix
304+
# Must match sparkCachePath() exactly.
305+
sparkCacheRelPathLength <- function() {
306+
if (is_windows()) {
307+
3
308+
} else {
309+
1
310+
}
311+
}
296312

297313
installInstruction <- function(mode) {
298314
if (mode == "remote") {
@@ -310,3 +326,22 @@ installInstruction <- function(mode) {
310326
stop(paste0("No instruction found for ", mode, " mode."))
311327
}
312328
}
329+
330+
uninstallDownloadedSpark <- function() {
331+
# clean up if Spark was downloaded
332+
sparkDownloaded <- getOne(".sparkDownloaded",
333+
envir = .sparkREnv,
334+
inherits = TRUE,
335+
ifnotfound = FALSE)
336+
sparkDownloadedDir <- Sys.getenv("SPARK_HOME")
337+
if (sparkDownloaded && nchar(sparkDownloadedDir) > 0) {
338+
unlink(sparkDownloadedDir, recursive = TRUE, force = TRUE)
339+
340+
dirs <- traverseParentDirs(sparkCachePath(), sparkCacheRelPathLength())
341+
lapply(dirs, function(d) {
342+
if (length(list.files(d, all.files = TRUE, include.dirs = TRUE, no.. = TRUE)) == 0) {
343+
unlink(d, recursive = TRUE, force = TRUE)
344+
}
345+
})
346+
}
347+
}

R/pkg/R/utils.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,3 +910,16 @@ hadoop_home_set <- function() {
910910
windows_with_hadoop <- function() {
911911
!is_windows() || hadoop_home_set()
912912
}
913+
914+
# get0 not supported before R 3.2.0
915+
getOne <- function(x, envir, inherits = TRUE, ifnotfound = NULL) {
916+
mget(x[1L], envir = envir, inherits = inherits, ifnotfound = list(ifnotfound))[[1L]]
917+
}
918+
919+
# Returns a vector of parent directories, traversing up count times, starting with a full path
920+
# eg. traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1) should return
921+
# this "/Users/user/Library/Caches/spark/spark2.2"
922+
# and "/Users/user/Library/Caches/spark"
923+
traverseParentDirs <- function(x, count) {
924+
if (dirname(x) == x || count <= 0) x else c(x, Recall(dirname(x), count - 1))
925+
}

R/pkg/tests/fulltests/test_mllib_classification.R

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ test_that("spark.svmLinear", {
6666
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
6767
data <- as.data.frame(cbind(label, feature))
6868
df <- createDataFrame(data)
69-
model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
69+
model <- spark.svmLinear(df, label ~ feature, regParam = 0.1, maxIter = 5)
7070
prediction <- collect(select(predict(model, df), "prediction"))
7171
expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
7272

@@ -77,10 +77,11 @@ test_that("spark.svmLinear", {
7777
trainidxs <- base::sample(nrow(data), nrow(data) * 0.7)
7878
traindf <- as.DataFrame(data[trainidxs, ])
7979
testdf <- as.DataFrame(rbind(data[-trainidxs, ], c(0, "the other")))
80-
model <- spark.svmLinear(traindf, clicked ~ ., regParam = 0.1)
80+
model <- spark.svmLinear(traindf, clicked ~ ., regParam = 0.1, maxIter = 5)
8181
predictions <- predict(model, testdf)
8282
expect_error(collect(predictions))
83-
model <- spark.svmLinear(traindf, clicked ~ ., regParam = 0.1, handleInvalid = "skip")
83+
model <- spark.svmLinear(traindf, clicked ~ ., regParam = 0.1,
84+
handleInvalid = "skip", maxIter = 5)
8485
predictions <- predict(model, testdf)
8586
expect_equal(class(collect(predictions)$clicked[1]), "list")
8687

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ test_that("test cache, uncache and clearCache", {
733733
expect_true(dropTempView("table1"))
734734

735735
expect_error(uncacheTable("foo"),
736-
"Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
736+
"Error in uncacheTable : analysis error - Table or view not found: foo")
737737
})
738738

739739
test_that("insertInto() on a registered table", {
@@ -1699,6 +1699,7 @@ test_that("date functions on a DataFrame", {
16991699
list(a = 2L, b = as.Date("2013-12-14")),
17001700
list(a = 3L, b = as.Date("2014-12-15")))
17011701
df <- createDataFrame(l)
1702+
expect_equal(collect(select(df, dayofweek(df$b)))[, 1], c(5, 7, 2))
17021703
expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15))
17031704
expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349))
17041705
expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51))

R/pkg/tests/fulltests/test_utils.R

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,4 +228,29 @@ test_that("basenameSansExtFromUrl", {
228228
expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive")
229229
})
230230

231+
test_that("getOne", {
232+
dummy <- getOne(".dummyValue", envir = new.env(), ifnotfound = FALSE)
233+
expect_equal(dummy, FALSE)
234+
})
235+
236+
test_that("traverseParentDirs", {
237+
if (is_windows()) {
238+
# original path is included as-is, otherwise dirname() replaces \\ with / on windows
239+
dirs <- traverseParentDirs("c:\\Users\\user\\AppData\\Local\\Apache\\Spark\\Cache\\spark2.2", 3)
240+
expect <- c("c:\\Users\\user\\AppData\\Local\\Apache\\Spark\\Cache\\spark2.2",
241+
"c:/Users/user/AppData/Local/Apache/Spark/Cache",
242+
"c:/Users/user/AppData/Local/Apache/Spark",
243+
"c:/Users/user/AppData/Local/Apache")
244+
expect_equal(dirs, expect)
245+
} else {
246+
dirs <- traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1)
247+
expect <- c("/Users/user/Library/Caches/spark/spark2.2", "/Users/user/Library/Caches/spark")
248+
expect_equal(dirs, expect)
249+
250+
dirs <- traverseParentDirs("/home/u/.cache/spark/spark2.2", 1)
251+
expect <- c("/home/u/.cache/spark/spark2.2", "/home/u/.cache/spark")
252+
expect_equal(dirs, expect)
253+
}
254+
})
255+
231256
sparkR.session.stop()

R/pkg/tests/run-all.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
4646
tmpDir <- tempdir()
4747
tmpArg <- paste0("-Djava.io.tmpdir=", tmpDir)
4848
sparkRTestConfig <- list(spark.driver.extraJavaOptions = tmpArg,
49-
spark.executor.extraJavaOptions = tmpArg)
49+
spark.executor.extraJavaOptions = tmpArg)
5050
}
5151

5252
test_package("SparkR")
@@ -60,3 +60,5 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
6060
NULL,
6161
"summary")
6262
}
63+
64+
SparkR:::uninstallDownloadedSpark()

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ opts_hooks$set(eval = function(options) {
3737
options
3838
})
3939
r_tmp_dir <- tempdir()
40-
tmp_arg <- paste("-Djava.io.tmpdir=", r_tmp_dir, sep = "")
40+
tmp_arg <- paste0("-Djava.io.tmpdir=", r_tmp_dir)
4141
sparkSessionConfig <- list(spark.driver.extraJavaOptions = tmp_arg,
4242
spark.executor.extraJavaOptions = tmp_arg)
4343
old_java_opt <- Sys.getenv("_JAVA_OPTIONS")
@@ -1183,3 +1183,7 @@ env | map
11831183
```{r, echo=FALSE}
11841184
sparkR.session.stop()
11851185
```
1186+
1187+
```{r cleanup, include=FALSE}
1188+
SparkR:::uninstallDownloadedSpark()
1189+
```

0 commit comments

Comments
 (0)