Skip to content

Commit 21568f9

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-14134
2 parents 81af47b + bb1fa5b commit 21568f9

File tree

583 files changed

+22263
-16023
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

583 files changed

+22263
-16023
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
238238
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
239239
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
240240
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
241+
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
241242
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
242243
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
243244
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ exportMethods("%in%",
265265
"var_samp",
266266
"weekofyear",
267267
"when",
268+
"window",
268269
"year")
269270

270271
exportClasses("GroupedData")

R/pkg/R/functions.R

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
21312131
column(jc)
21322132
})
21332133

2134+
#' window
2135+
#'
2136+
#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
2137+
#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
2138+
#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
2139+
#' the order of months are not supported.
2140+
#'
2141+
#' The time column must be of TimestampType.
2142+
#'
2143+
#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
2144+
#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
2145+
#' If the `slideDuration` is not provided, the windows will be tumbling windows.
2146+
#'
2147+
#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
2148+
#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
2149+
#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
2150+
#'
2151+
#' The output column will be a struct called 'window' by default with the nested columns 'start'
2152+
#' and 'end'.
2153+
#'
2154+
#' @family datetime_funcs
2155+
#' @rdname window
2156+
#' @name window
2157+
#' @export
2158+
#' @examples
2159+
#'\dontrun{
2160+
#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
2161+
#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
2162+
#' window(df$time, "1 minute", "15 seconds", "10 seconds")
2163+
#'
2164+
#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
2165+
#' # 09:01:15-09:02:15...
2166+
#' window(df$time, "1 minute", startTime = "15 seconds")
2167+
#'
2168+
#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
2169+
#' window(df$time, "30 seconds", "10 seconds")
2170+
#'}
2171+
setMethod("window", signature(x = "Column"),
2172+
function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
2173+
stopifnot(is.character(windowDuration))
2174+
if (!is.null(slideDuration) && !is.null(startTime)) {
2175+
stopifnot(is.character(slideDuration) && is.character(startTime))
2176+
jc <- callJStatic("org.apache.spark.sql.functions",
2177+
"window",
2178+
x@jc, windowDuration, slideDuration, startTime)
2179+
} else if (!is.null(slideDuration)) {
2180+
stopifnot(is.character(slideDuration))
2181+
jc <- callJStatic("org.apache.spark.sql.functions",
2182+
"window",
2183+
x@jc, windowDuration, slideDuration)
2184+
} else if (!is.null(startTime)) {
2185+
stopifnot(is.character(startTime))
2186+
jc <- callJStatic("org.apache.spark.sql.functions",
2187+
"window",
2188+
x@jc, windowDuration, windowDuration, startTime)
2189+
} else {
2190+
jc <- callJStatic("org.apache.spark.sql.functions",
2191+
"window",
2192+
x@jc, windowDuration)
2193+
}
2194+
column(jc)
2195+
})
2196+
21342197
#' locate
21352198
#'
21362199
#' Locate the position of the first occurrence of substr.

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
11521152
#' @export
11531153
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
11541154

1155+
#' @rdname window
1156+
#' @export
1157+
setGeneric("window", function(x, ...) { standardGeneric("window") })
1158+
11551159
#' @rdname year
11561160
#' @export
11571161
setGeneric("year", function(x) { standardGeneric("year") })

R/pkg/R/mllib.R

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ setClass("NaiveBayesModel", representation(jobj = "jobj"))
3232
#' @export
3333
setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj"))
3434

35+
#' @title S4 class that represents a KMeansModel
36+
#' @param jobj a Java object reference to the backing Scala KMeansModel
37+
#' @export
38+
setClass("KMeansModel", representation(jobj = "jobj"))
39+
3540
#' Fits a generalized linear model
3641
#'
3742
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
@@ -154,17 +159,6 @@ setMethod("summary", signature(object = "PipelineModel"),
154159
colnames(coefficients) <- c("Estimate")
155160
rownames(coefficients) <- unlist(features)
156161
return(list(coefficients = coefficients))
157-
} else if (modelName == "KMeansModel") {
158-
modelSize <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
159-
"getKMeansModelSize", object@model)
160-
cluster <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
161-
"getKMeansCluster", object@model, "classes")
162-
k <- unlist(modelSize)[1]
163-
size <- unlist(modelSize)[-1]
164-
coefficients <- t(matrix(coefficients, ncol = k))
165-
colnames(coefficients) <- unlist(features)
166-
rownames(coefficients) <- 1:k
167-
return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster)))
168162
} else {
169163
stop(paste("Unsupported model", modelName, sep = " "))
170164
}
@@ -213,21 +207,21 @@ setMethod("summary", signature(object = "NaiveBayesModel"),
213207
#' @examples
214208
#' \dontrun{
215209
#' model <- kmeans(x, centers = 2, algorithm="random")
216-
#'}
210+
#' }
217211
setMethod("kmeans", signature(x = "DataFrame"),
218212
function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) {
219213
columnNames <- as.array(colnames(x))
220214
algorithm <- match.arg(algorithm)
221-
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", x@sdf,
222-
algorithm, iter.max, centers, columnNames)
223-
return(new("PipelineModel", model = model))
215+
jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", x@sdf,
216+
centers, iter.max, algorithm, columnNames)
217+
return(new("KMeansModel", jobj = jobj))
224218
})
225219

226-
#' Get fitted result from a model
220+
#' Get fitted result from a k-means model
227221
#'
228-
#' Get fitted result from a model, similarly to R's fitted().
222+
#' Get fitted result from a k-means model, similarly to R's fitted().
229223
#'
230-
#' @param object A fitted MLlib model
224+
#' @param object A fitted k-means model
231225
#' @return DataFrame containing fitted values
232226
#' @rdname fitted
233227
#' @export
@@ -237,19 +231,58 @@ setMethod("kmeans", signature(x = "DataFrame"),
237231
#' fitted.model <- fitted(model)
238232
#' showDF(fitted.model)
239233
#'}
240-
setMethod("fitted", signature(object = "PipelineModel"),
234+
setMethod("fitted", signature(object = "KMeansModel"),
241235
function(object, method = c("centers", "classes"), ...) {
242-
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
243-
"getModelName", object@model)
236+
method <- match.arg(method)
237+
return(dataFrame(callJMethod(object@jobj, "fitted", method)))
238+
})
244239

245-
if (modelName == "KMeansModel") {
246-
method <- match.arg(method)
247-
fittedResult <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
248-
"getKMeansCluster", object@model, method)
249-
return(dataFrame(fittedResult))
250-
} else {
251-
stop(paste("Unsupported model", modelName, sep = " "))
252-
}
240+
#' Get the summary of a k-means model
241+
#'
242+
#' Returns the summary of a k-means model produced by kmeans(),
243+
#' similarly to R's summary().
244+
#'
245+
#' @param object a fitted k-means model
246+
#' @return the model's coefficients, size and cluster
247+
#' @rdname summary
248+
#' @export
249+
#' @examples
250+
#' \dontrun{
251+
#' model <- kmeans(trainingData, 2)
252+
#' summary(model)
253+
#' }
254+
setMethod("summary", signature(object = "KMeansModel"),
255+
function(object, ...) {
256+
jobj <- object@jobj
257+
features <- callJMethod(jobj, "features")
258+
coefficients <- callJMethod(jobj, "coefficients")
259+
cluster <- callJMethod(jobj, "cluster")
260+
k <- callJMethod(jobj, "k")
261+
size <- callJMethod(jobj, "size")
262+
coefficients <- t(matrix(coefficients, ncol = k))
263+
colnames(coefficients) <- unlist(features)
264+
rownames(coefficients) <- 1:k
265+
return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster)))
266+
})
267+
268+
#' Make predictions from a k-means model
269+
#'
270+
#' Make predictions from a model produced by kmeans().
271+
#'
272+
#' @param object A fitted k-means model
273+
#' @param newData DataFrame for testing
274+
#' @return DataFrame containing predicted labels in a column named "prediction"
275+
#' @rdname predict
276+
#' @export
277+
#' @examples
278+
#' \dontrun{
279+
#' model <- kmeans(trainingData, 2)
280+
#' predicted <- predict(model, testData)
281+
#' showDF(predicted)
282+
#' }
283+
setMethod("predict", signature(object = "KMeansModel"),
284+
function(object, newData) {
285+
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
253286
})
254287

255288
#' Fit a Bernoulli naive Bayes model

R/pkg/inst/tests/testthat/test_context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
2626
maskedBySparkR <- masked[funcSparkROrEmpty]
2727
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
2828
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
29-
"summary", "transform", "drop")
29+
"summary", "transform", "drop", "window")
3030
expect_equal(length(maskedBySparkR), length(namesOfMasked))
3131
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
3232
# above are those reported as masked when `library(SparkR)`

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,3 +791,11 @@ test_that("sampleByKey() on pairwise RDDs", {
791791
expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
792792
expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
793793
})
794+
795+
test_that("Test correct concurrency of RRDD.compute()", {
796+
rdd <- parallelize(sc, 1:1000, 100)
797+
jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row")
798+
zrdd <- callJMethod(jrdd, "zip", jrdd)
799+
count <- callJMethod(zrdd, "count")
800+
expect_equal(count, 1000)
801+
})

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
12041204
expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
12051205
})
12061206

1207+
test_that("time windowing (window()) with all inputs", {
1208+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1209+
df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
1210+
local <- collect(df)$v
1211+
# Not checking time windows because of possible time zone issues. Just checking that the function
1212+
# works
1213+
expect_equal(local, c(1))
1214+
})
1215+
1216+
test_that("time windowing (window()) with slide duration", {
1217+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1218+
df$window <- window(df$t, "5 seconds", "2 seconds")
1219+
local <- collect(df)$v
1220+
# Not checking time windows because of possible time zone issues. Just checking that the function
1221+
# works
1222+
expect_equal(local, c(1, 1))
1223+
})
1224+
1225+
test_that("time windowing (window()) with start time", {
1226+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1227+
df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
1228+
local <- collect(df)$v
1229+
# Not checking time windows because of possible time zone issues. Just checking that the function
1230+
# works
1231+
expect_equal(local, c(1))
1232+
})
1233+
1234+
test_that("time windowing (window()) with just window duration", {
1235+
df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
1236+
df$window <- window(df$t, "5 seconds")
1237+
local <- collect(df)$v
1238+
# Not checking time windows because of possible time zone issues. Just checking that the function
1239+
# works
1240+
expect_equal(local, c(1))
1241+
})
1242+
12071243
test_that("when(), otherwise() and ifelse() on a DataFrame", {
12081244
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
12091245
df <- createDataFrame(sqlContext, l)

0 commit comments

Comments
 (0)