Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1420,13 +1420,12 @@ setMethod("dapplyCollect",
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' schema <- structType(structField("avg", "double"))
#' result <- gapply(
#' df,
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' y <- data.frame(mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#'
#' We can also group the data and afterwards call gapply on GroupedData.
Expand All @@ -1435,7 +1434,7 @@ setMethod("dapplyCollect",
#' result <- gapply(
#' gdf,
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' y <- data.frame(mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#' collect(result)
#'
Expand Down Expand Up @@ -1465,10 +1464,10 @@ setMethod("dapplyCollect",
#'
#' Result
#' ---------
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#' Model Species (Intercept) Sepal_Width Petal_Length Petal_Width
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an external change. I think such an external change is not acceptable after we already introduce it. Right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's going to be a breaking change, yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past we had a discussion about backward compatibility with shivaram.
#14431 (comment)

I think I didn't push R changes, because I wanted to be able to access the grouping columns on sql side first. Without being able to access the grouping columns I couldn't find a way to keep backward compatibility without breaking anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I'd think it's reasonable if under a switch

#' 1 virginica 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 versicolor 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 setosa 2.351890 0.6548350 0.2375602 0.2521257
#'
#'}
#' @note gapply(SparkDataFrame) since 2.0.0
Expand Down Expand Up @@ -1512,8 +1511,6 @@ setMethod("gapply",
#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' colnames(y) <- c("key_a", "key_c", "mean_b")
#' y
#' })
#'
#' We can also group the data and afterwards call gapply on GroupedData.
Expand All @@ -1529,7 +1526,7 @@ setMethod("gapply",
#'
#' Result
#' ------
#' key_a key_c mean_b
#' X3L X.3. mean.x.b.
#' 3 3 3.0
#' 1 1 1.5
#'
Expand All @@ -1546,13 +1543,13 @@ setMethod("gapply",
#' Sepal_Width + Petal_Length + Petal_Width, x))
#' data.frame(t(coef(m)))
#' })
#' colnames(result) <- c("Species", "Intercept", "Sepal_Width",
#' "Petal_Length", "Petal_Width")
#'
#' Result
#'---------
#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 2.351890 0.6548350 0.2375602 0.2521257
#' Model Species (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 virginica 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 versicolor 1.895540 0.3868576 0.9083370 -0.6792238
#' 3 setosa 2.351890 0.6548350 0.2375602 0.2521257
#'
#'}
#' @note gapplyCollect(SparkDataFrame) since 2.0.0
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ setMethod("gapplyCollect",
# which is a serialized data.frame corresponds to one group of the
# SparkDataFrame.
ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
ldfcolNames <- if (length(ldfs) > 0) colnames(ldfs[[1]]) else NULL

# set consistent column names before calling rbind, otherwise rbind fails
ldfs <- lapply(ldfs, function(df) {
colnames(df) <- ldfcolNames
df
})
ldf <- do.call(rbind, ldfs)
row.names(ldf) <- NULL
ldf
Expand Down
17 changes: 8 additions & 9 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2289,20 +2289,20 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
c("a", "b", "c", "d"))
expected <- collect(df)
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
actual <- collect(df1)
actual <- collect(df1)[-1]
expect_identical(actual, expected)

df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
expect_identical(df1Collect, expected)
expect_identical(df1Collect[-1], expected)

# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
schema <- structType(structField("e", "boolean"))
df2 <- gapply(
df,
c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
y <- data.frame(sum(x$b) > 2)
},
schema)
actual <- collect(df2)$e
Expand All @@ -2322,13 +2322,12 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {

# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
schema <- structType(structField("avg", "double"))
df3 <- gapply(
df,
c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
y <- data.frame(mean(x$b), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df3)
Expand All @@ -2353,13 +2352,13 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
expect_identical(actual$avg, expected$avg)

irisDF <- suppressWarnings(createDataFrame (iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
schema <- structType(structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
cols = "Sepal_Length",
irisDF,
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
y <- data.frame(mean(x$Sepal_Width), stringsAsFactors = FALSE)
},
schema)
actual <- collect(df4)
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ compute <- function(mode, partition, serializer, deserializer, key,
}

if (mode == 2) {
output <- computeFunc(key, inputData)
outputKeys <- data.frame(key, stringsAsFactors = FALSE)
# prepend keys to R function's output
output <- cbind(outputKeys, computeFunc(key, inputData))
} else {
output <- computeFunc(inputData)
}
Expand Down
36 changes: 19 additions & 17 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,25 @@ The output of function should be a `data.frame`. Schema specifies the row format
{% highlight r %}

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
schema <- structType(structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
y <- data.frame(max(x$eruptions))
},
schema)
colnames(result) <- c("waiting", "max_eruption")

head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
##1 96 5.100
##2 76 5.067
##3 77 5.033
##4 88 5.000
##5 86 4.933
##6 82 4.900
{% endhighlight %}
</div>

Expand All @@ -359,19 +361,19 @@ result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
y <- data.frame(max(x$eruptions))
})
colnames(result) <- c("waiting", "max_eruption")

head(result[order(result$max_eruption, decreasing = TRUE), ])

## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
##1 96 5.100
##2 76 5.067
##3 77 5.033
##4 88 5.000
##5 86 4.933
##6 82 4.900

{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,17 @@ class RelationalGroupedDataset protected[sql](
f: Array[Byte],
packageNames: Array[Byte],
broadcastVars: Array[Broadcast[Object]],
outputSchema: StructType): DataFrame = {
schema: StructType): DataFrame = {
val groupingNamedExpressions = groupingExprs.map(alias)
val groupingCols = groupingNamedExpressions.map(Column(_))
val groupingDataFrame = df.select(groupingCols : _*)
val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)

val outputSchema = if (schema == null) {
SERIALIZED_R_DATA_SCHEMA
} else {
StructType(groupingDataFrame.schema.fields ++ schema.fields)
}
Dataset.ofRows(
df.sparkSession,
FlatMapGroupsInR(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ private[sql] object SQLUtils extends Logging {
broadcastVars: Array[Object],
schema: StructType): DataFrame = {
val bv = broadcastVars.map(_.asInstanceOf[Broadcast[Object]])
val realSchema = if (schema == null) SERIALIZED_R_DATA_SCHEMA else schema
gd.flatMapGroupsInR(func, packageNames, bv, realSchema)
gd.flatMapGroupsInR(func, packageNames, bv, schema)
}


Expand Down