Skip to content

Commit d9eaf5a

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into fred-16963
2 parents 46f6411 + dcdda19 commit d9eaf5a

File tree

164 files changed

+3749
-1289
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+3749
-1289
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ exportMethods("arrange",
124124
"selectExpr",
125125
"show",
126126
"showDF",
127+
"storageLevel",
127128
"subset",
128129
"summarize",
129130
"summary",

R/pkg/R/DataFrame.R

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ setMethod("colnames<-",
365365

366366
# Check if the column names have . in it
367367
if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
368-
stop("Colum names cannot contain the '.' symbol.")
368+
stop("Column names cannot contain the '.' symbol.")
369369
}
370370

371371
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
@@ -633,7 +633,7 @@ setMethod("persist",
633633
#' @param ... further arguments to be passed to or from other methods.
634634
#'
635635
#' @family SparkDataFrame functions
636-
#' @rdname unpersist-methods
636+
#' @rdname unpersist
637637
#' @aliases unpersist,SparkDataFrame-method
638638
#' @name unpersist
639639
#' @export
@@ -654,6 +654,32 @@ setMethod("unpersist",
654654
x
655655
})
656656

657+
#' StorageLevel
658+
#'
659+
#' Get storagelevel of this SparkDataFrame.
660+
#'
661+
#' @param x the SparkDataFrame to get the storageLevel.
662+
#'
663+
#' @family SparkDataFrame functions
664+
#' @rdname storageLevel
665+
#' @aliases storageLevel,SparkDataFrame-method
666+
#' @name storageLevel
667+
#' @export
668+
#' @examples
669+
#'\dontrun{
670+
#' sparkR.session()
671+
#' path <- "path/to/file.json"
672+
#' df <- read.json(path)
673+
#' persist(df, "MEMORY_AND_DISK")
674+
#' storageLevel(df)
675+
#'}
676+
#' @note storageLevel since 2.1.0
677+
setMethod("storageLevel",
678+
signature(x = "SparkDataFrame"),
679+
function(x) {
680+
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
681+
})
682+
657683
#' Repartition
658684
#'
659685
#' The following options for repartition are possible:

R/pkg/R/RDD.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ setMethod("persistRDD",
261261
#' cache(rdd) # rdd@@env$isCached == TRUE
262262
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
263263
#'}
264-
#' @rdname unpersist-methods
264+
#' @rdname unpersist
265265
#' @aliases unpersist,RDD-method
266266
#' @noRd
267267
setMethod("unpersistRDD",

R/pkg/R/generics.R

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")
691691
#' @export
692692
setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })
693693

694+
# @rdname storageLevel
695+
# @export
696+
setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") })
697+
694698
#' @rdname subset
695699
#' @export
696700
setGeneric("subset", function(x, ...) { standardGeneric("subset") })
@@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
715719
#' @export
716720
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
717721

718-
#' @rdname unpersist-methods
722+
#' @rdname unpersist
719723
#' @export
720724
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
721725

R/pkg/R/mllib.R

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,8 @@ setMethod("predict", signature(object = "KMeansModel"),
665665
#' @param tol convergence tolerance of iterations.
666666
#' @param stepSize stepSize parameter.
667667
#' @param seed seed parameter for weights initialization.
668+
#' @param initialWeights initialWeights parameter for weights initialization, it should be a
669+
#' numeric vector.
668670
#' @param ... additional arguments passed to the method.
669671
#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
670672
#' @rdname spark.mlp
@@ -677,8 +679,9 @@ setMethod("predict", signature(object = "KMeansModel"),
677679
#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
678680
#'
679681
#' # fit a Multilayer Perceptron Classification Model
680-
#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs",
681-
#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
682+
#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 3), solver = "l-bfgs",
683+
#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1,
684+
#' initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
682685
#'
683686
#' # get the summary of the model
684687
#' summary(model)
@@ -695,7 +698,7 @@ setMethod("predict", signature(object = "KMeansModel"),
695698
#' @note spark.mlp since 2.1.0
696699
setMethod("spark.mlp", signature(data = "SparkDataFrame"),
697700
function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
698-
tol = 1E-6, stepSize = 0.03, seed = NULL) {
701+
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL) {
699702
if (is.null(layers)) {
700703
stop ("layers must be a integer vector with length > 1.")
701704
}
@@ -706,10 +709,13 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame"),
706709
if (!is.null(seed)) {
707710
seed <- as.character(as.integer(seed))
708711
}
712+
if (!is.null(initialWeights)) {
713+
initialWeights <- as.array(as.numeric(na.omit(initialWeights)))
714+
}
709715
jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
710716
"fit", data@sdf, as.integer(blockSize), as.array(layers),
711717
as.character(solver), as.integer(maxIter), as.numeric(tol),
712-
as.numeric(stepSize), seed)
718+
as.numeric(stepSize), seed, initialWeights)
713719
new("MultilayerPerceptronClassificationModel", jobj = jobj)
714720
})
715721

R/pkg/R/utils.R

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
385385
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
386386
}
387387

388+
storageLevelToString <- function(levelObj) {
389+
useDisk <- callJMethod(levelObj, "useDisk")
390+
useMemory <- callJMethod(levelObj, "useMemory")
391+
useOffHeap <- callJMethod(levelObj, "useOffHeap")
392+
deserialized <- callJMethod(levelObj, "deserialized")
393+
replication <- callJMethod(levelObj, "replication")
394+
shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
395+
"NONE"
396+
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
397+
"DISK_ONLY"
398+
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
399+
"DISK_ONLY_2"
400+
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
401+
"MEMORY_ONLY"
402+
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
403+
"MEMORY_ONLY_2"
404+
} else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
405+
"MEMORY_ONLY_SER"
406+
} else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
407+
"MEMORY_ONLY_SER_2"
408+
} else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
409+
"MEMORY_AND_DISK"
410+
} else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
411+
"MEMORY_AND_DISK_2"
412+
} else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
413+
"MEMORY_AND_DISK_SER"
414+
} else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
415+
"MEMORY_AND_DISK_SER_2"
416+
} else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
417+
"OFF_HEAP"
418+
} else {
419+
NULL
420+
}
421+
fullInfo <- callJMethod(levelObj, "toString")
422+
if (is.null(shortName)) {
423+
fullInfo
424+
} else {
425+
paste(shortName, "-", fullInfo)
426+
}
427+
}
428+
388429
# Utility function for functions where an argument needs to be integer but we want to allow
389430
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
390431
numToInt <- function(num) {

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,21 @@ test_that("spark.mlp", {
410410
model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
411411
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
412412
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 1, 0, 0, 1))
413+
414+
# test initialWeights
415+
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
416+
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
417+
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
418+
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1))
419+
420+
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
421+
c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
422+
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
423+
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1))
424+
425+
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2)
426+
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
427+
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 2, 1, 0, 0, 1))
413428
})
414429

415430
test_that("spark.naiveBayes", {

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
796796
expect_false(collectRDD(second)[[3]]$testCol)
797797
})
798798

799-
test_that("cache(), persist(), and unpersist() on a DataFrame", {
799+
test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
800800
df <- read.json(jsonPath)
801801
expect_false(df@env$isCached)
802802
cache(df)
@@ -808,6 +808,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
808808
persist(df, "MEMORY_AND_DISK")
809809
expect_true(df@env$isCached)
810810

811+
expect_equal(storageLevel(df),
812+
"MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
813+
811814
unpersist(df)
812815
expect_false(df@env$isCached)
813816

@@ -845,7 +848,7 @@ test_that("names() colnames() set the column names", {
845848
expect_equal(names(df)[1], "col3")
846849

847850
expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
848-
"Colum names cannot contain the '.' symbol.")
851+
"Column names cannot contain the '.' symbol.")
849852
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
850853
expect_error(colnames(df) <- c("a"),
851854
"Column names must have the same length as the number of columns in the dataset.")

core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ object WriteInputFormatTestDataGenerator {
144144

145145
// Create test data for ArrayWritable
146146
val data = Seq(
147-
(1, Array()),
147+
(1, Array.empty[Double]),
148148
(2, Array(3.0, 4.0, 5.0)),
149149
(3, Array(4.0, 5.0, 6.0))
150150
)

0 commit comments

Comments
 (0)