Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
19dcb2d
First commit gapply
NarineK May 2, 2016
9c5473f
Fixed Roxigen issue
NarineK May 2, 2016
66ca64e
Fix test cases
NarineK May 2, 2016
6bc882b
fixed ordering for MapGroupsR and MapPartitionsInR
NarineK May 2, 2016
43f8ec3
Fixed scala style-check issues + added/fixed comments
NarineK May 2, 2016
f8caa70
Add pretty test cases and examples
NarineK May 2, 2016
1b2f5c1
More scala stylecheck fixes.
NarineK May 3, 2016
caefc71
added ml examples
NarineK May 3, 2016
84fe176
Added support for 'Column' type on R side and made the test cases mor…
NarineK May 3, 2016
4067be7
Fixed R check style
NarineK May 3, 2016
f5aab7d
more style beautifications
NarineK May 3, 2016
da1bfea
Small fixes in the comments
NarineK May 3, 2016
a1c33ec
merged with master
NarineK May 11, 2016
8b8ec8c
Supporting multiple groups, gapply support both for SparkDataFrame an…
NarineK May 12, 2016
da7bb2b
Merged with master
NarineK May 13, 2016
0b1b255
Added: A worker support for mutiple groups (with minimal modifications)
NarineK May 15, 2016
7e58472
Removed gapply mode from R side + passing keys(grouping column names)…
NarineK May 18, 2016
b3ed805
merging with master
NarineK May 18, 2016
07bbbd2
Merge branch 'gapply2' of https://github.com/NarineK/spark into gapply2
NarineK May 25, 2016
0928740
Addressed Shivaram's comments + added the key to R functions
NarineK May 26, 2016
9cacd4d
Merge branch 'master' of https://github.com/apache/spark into gapply2
NarineK May 26, 2016
f8c994f
Addressed Shivaram's and some of Sun-Rui's comments
NarineK May 29, 2016
b6cd08a
Merge branch 'master' of https://github.com/apache/spark into gapply2
NarineK May 29, 2016
4532102
merge with master + remove sqlContext
NarineK May 29, 2016
52c9f6d
Calling elapsedSecs() in computeHelper
NarineK May 30, 2016
6b91858
Fixing elapsedSecs() in computeHelper
NarineK May 30, 2016
aca5395
Bringing back the changes for computing the elap
NarineK May 30, 2016
a0425c1
Updated comments for - R function
NarineK May 31, 2016
7b5767a
updated SparkDataFrame's gapply comment
NarineK May 31, 2016
10f99d1
move groupNames above the execute()
NarineK Jun 1, 2016
7e1f7c2
Addressed sun-rui's comments + added a new test case
NarineK Jun 4, 2016
cbde29a
Optimized worker
NarineK Jun 4, 2016
b4fef85
Merge branch 'master' of https://github.com/apache/spark into gapply2
NarineK Jun 4, 2016
46df2ee
pass correct encoder and deserializer after merge
NarineK Jun 4, 2016
afa7e4e
move output elaps to a variable
NarineK Jun 5, 2016
0a22042
revert changes in MapPartitionsRWrapper
NarineK Jun 5, 2016
249568e
Merge branch 'master' of https://github.com/apache/spark into gapply2
NarineK Jun 5, 2016
e4fa8e6
Fixing test cases - sorting the dataframe after collecting it
NarineK Jun 6, 2016
00a091e
test case with iris - removing sqlContext argument + other small changes
NarineK Jun 6, 2016
afa385d
fixes white spaces in test cases
NarineK Jun 9, 2016
20a1c37
overriding stringArgs in FlatMapGroupsInR
NarineK Jun 10, 2016
e07f41b
Merge branch 'master' of https://github.com/apache/spark into gapply2
NarineK Jun 10, 2016
0ca74fd
overriding stringArgs in FlatMapGroupsInR: changing input arguments
NarineK Jun 10, 2016
d51441f
Addressed Cheng Lian's comments
NarineK Jun 11, 2016
1aa368d
Addressed sun-rui's comments
NarineK Jun 13, 2016
91e1944
RelationalGroupedData.flatMapGroupsInR use passed arguments
NarineK Jun 13, 2016
4d1cc6b
removed unnecessary comment
NarineK Jun 15, 2016
fe36d24
Updated examples' doc
NarineK Jun 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ exportMethods("arrange",
"filter",
"first",
"freqItems",
"gapply",
"group_by",
"groupBy",
"head",
Expand Down
82 changes: 81 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ dapplyInternal <- function(x, func, schema) {
#' func should have only one parameter, to which a data.frame corresponds
#' to each partition will be passed.
#' The output of func should be a data.frame.
#' @param schema The schema of the resulting DataFrame after the function is applied.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' It must match the output of func.
#' @family SparkDataFrame functions
#' @rdname dapply
Expand Down Expand Up @@ -1266,6 +1266,86 @@ setMethod("dapplyCollect",
ldf
})

#' gapply
#'
#' Group the SparkDataFrame using the specified columns and apply the R function to each
#' group.
#'
#' @param x A SparkDataFrame
#' @param cols Grouping columns
#' @param func A function to be applied to each group partition specified by grouping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: It would be good to say what the function will get as its input. Right now its the key and a dataframe with the grouping columns ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the key and the Dataframe with the grouping columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

#' column of the SparkDataFrame. The function `func` takes as argument
#' a key - grouping columns and a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comment: It will be good to clarify how this schema can be constructed. i.e. something like The output schema is usually the the schema for the key along with the schema of the output R data frame. We can also highlight this in the programming guide

Copy link
Contributor Author

@NarineK NarineK Jun 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output schema is purely based on the output dataframe, if key is included in the output then we need to include the key in the schema.
Basically, the schema has to match to what we want to output. If we want to output only the average in the above example, we could have:
schema <- structType(structField("avg", "double")),
what really matters is the data-type - it has to be double in above example, it cannot be string or character .... unless otherwise we explicitly convert it into e.g. string in the R function. The name doesn't matter either. I could have "hello", instead "avg'.

Copy link
Contributor Author

@NarineK NarineK Jun 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have in the documentation smth like:

"The schema has to correspond to output SparkDataFrame. It has to be defined for each output column with preferred output column name and corresponding data type."

How does this sound ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thats fine. Also in the example below where we construct schema you can add a comment line which looks like Here our output contains 2 columns, the key which is a integer and the mean which is a double.

#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @family SparkDataFrame functions
#' @rdname gapply
#' @name gapply
#' @export
#' @examples
#'
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' df,
#' list("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#'
#' Result
#' ------
#' a c avg
#' 3 3 3.0
#' 1 1 1.5
#'
#' Fits linear models on iris dataset by grouping on the 'Species' column and
#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
#' and 'Petal_Width' as training features.
#'
#' df <- createDataFrame (iris)
#' schema <- structType(structField("(Intercept)", "double"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above, do the column names also have to match ? i.e. is (Intercept) important here or would Intercept work as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names do not have to match, we can give any name we want. Instead of "(Intercept)" I could have "(MyIntercept)". The datatype is important.

#' structField("Sepal_Width", "double"),structField("Petal_Length", "double"),
#' structField("Petal_Width", "double"))
#' df1 <- gapply(
#' df,
#' list(df$"Species"),
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' }, schema)
#' collect(df1)
#'
#'Result
#'---------
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#'
#'}
setMethod("gapply",
signature(x = "SparkDataFrame"),
function(x, cols, func, schema) {
grouped <- do.call("groupBy", c(x, cols))
gapply(grouped, func, schema)
})

############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
# but allow for use with DataFrames by first converting to an RRDD before calling #
Expand Down
30 changes: 30 additions & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,36 @@ readMultipleObjects <- function(inputCon) {
data # this is a list of named lists now
}

readMultipleObjectsWithKeys <- function(inputCon) {
# readMultipleObjectsWithKeys will read multiple continuous objects from
# a DataOutputStream. There is no preceding field telling the count
# of the objects, so the number of objects varies, we try to read
# all objects in a loop until the end of the stream. This function
# is for use by gapply. Each group of rows is followed by the grouping
# key for this group which is then followed by next group.
keys <- list()
data <- list()
subData <- list()
while (TRUE) {
# If reaching the end of the stream, type returned should be "".
type <- readType(inputCon)
if (type == "") {
break
} else if (type == "r") {
type <- readType(inputCon)
# A grouping boundary detected
key <- readTypedObject(inputCon, type)
index <- length(data) + 1L
data[[index]] <- subData
keys[[index]] <- key
subData <- list()
} else {
subData[[length(subData) + 1L]] <- readTypedObject(inputCon, type)
}
}
list(keys = keys, data = data) # this is a list of keys and corresponding data
}

readRowList <- function(obj) {
# readRowList is meant for use inside an lapply. As a result, it is
# necessary to open a standalone connection for the row and consume
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
#' @export
setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect") })

#' @rdname gapply
#' @export
setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })

#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
Expand Down
62 changes: 62 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,65 @@ createMethods <- function() {
}

createMethods()

#' gapply
#'
#' Applies a R function to each group in the input GroupedData
#'
#' @param x a GroupedData
#' @param func A function to be applied to each group partition specified by GroupedData.
#' The function `func` takes as argument a key - grouping columns and
#' a data frame - a local R data.frame.
#' The output of `func` is a local R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
#' @return a SparkDataFrame
#' @rdname gapply
#' @name gapply
#' @examples
#' \dontrun{
#' Computes the arithmetic mean of the second column by grouping
#' on the first and third columns. Output the grouping values and the average.
#'
#' df <- createDataFrame (
#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
#' c("a", "b", "c", "d"))
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(
#' df,
#' list("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' },
#' schema)
#' collect(df1)
#'
#' Result
#' ------
#' a c avg
#' 3 3 3.0
#' 1 1 1.5
#' }
setMethod("gapply",
signature(x = "GroupedData"),
function(x, func, schema) {
try(if (is.null(schema)) stop("schema cannot be NULL"))
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
sdf <- callJStatic(
"org.apache.spark.sql.api.r.SQLUtils",
"gapply",
x@sgd,
serialize(cleanClosure(func), connection = NULL),
packageNamesArr,
broadcastArr,
schema$jobj)
dataFrame(sdf)
})
65 changes: 65 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,71 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})

test_that("gapply() on a DataFrame", {
Copy link
Member

@gatorsmile gatorsmile May 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to write a new test case for gapply. The test cases should be completely different from dapply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test which was used for our previous group-apply showcase for customers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test case is reasonable.

df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
expected <- collect(df)
df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)

# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
df2 <- gapply(
df,
list(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)

# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
df3 <- gapply(
df,
list("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df3)
actual <- actual[order(actual$a), ]
rownames(actual) <- NULL
expected <- collect(select(df, "a", "b", "c"))
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
colnames(expected) <- c("a", "c", "avg")
expected <- expected[order(expected$a), ]
rownames(expected) <- NULL
expect_identical(actual, expected)

irisDF <- suppressWarnings(createDataFrame (iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
cols = list("Sepal_Length"),
irisDF,
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df4)
actual <- actual[order(actual$Sepal_Length), ]
rownames(actual) <- NULL
agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
stringsAsFactors = FALSE)
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
rownames(expected) <- NULL
expect_identical(actual, expected)
})

test_that("Window functions on a DataFrame", {
setHiveContext(sc)
df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
Expand Down
Loading