-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12922][SparkR][WIP] Implement gapply() on DataFrame in SparkR #12836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #57511 has finished for PR 12836 at commit
|
| child: LogicalPlan) extends UnaryNode with ObjectProducer | ||
|
|
||
|
|
||
| object MapPartitionsInR { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will fix!
| inputSchema: StructType, | ||
| outputSchema: StructType) extends ((Any, Iterator[Any]) => TraversableOnce[Any]) { | ||
|
|
||
| def apply(key: Any, iter: Iterator[Any]): TraversableOnce[Any] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird style. Need to follow the common style.
|
Test build #57512 has finished for PR 12836 at commit
|
R/pkg/R/DataFrame.R
Outdated
|
|
||
| #' gapply | ||
| #' | ||
| #' Apply a function to each group of a DataFrame. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the description, we need to explain what is a group; Otherwise, users will not know how to use it.
| case logical.MapPartitionsInR(f, p, b, is, os, objAttr, child) => | ||
| execution.MapPartitionsExec( | ||
| execution.r.MapPartitionsRWrapper(f, p, b, is, os), objAttr, planLater(child)) :: Nil | ||
| case logical.MapGroupsR(f, p, b, is, os, key, value, grouping, data, objAttr, child) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MapGroupsInR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed MapGroupsR to MapGroupsPartitionsInR.
Or maybe MapGroupsInR is better. Not sure. @sun-rui ?
| SERIALIZED_R_DATA_SCHEMA | ||
| } else { | ||
| schema | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to keep it consistent with dapply, I haven't made it one line:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L142
But we can make it one line, maybe in both cases ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should change both
|
Test build #57517 has finished for PR 12836 at commit
|
|
Test build #60391 has finished for PR 12836 at commit
|
|
Test build #60392 has finished for PR 12836 at commit
|
|
Addressed your comments @sun-rui, please let me know if you have more comments. |
|
@NarineK, there is one comment left un-addressed |
|
Test build #60574 has finished for PR 12836 at commit
|
| #' column of the SparkDataFrame. The function `func` takes as argument | ||
| #' a key - grouping columns and a data frame - a local R data.frame. | ||
| #' The output of `func` is a local R data.frame. | ||
| #' @param schema The schema of the resulting SparkDataFrame after the function is applied. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment: It will be good to clarify how this schema can be constructed. i.e. something like The output schema is usually the the schema for the key along with the schema of the output R data frame. We can also highlight this in the programming guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output schema is purely based on the output dataframe, if key is included in the output then we need to include the key in the schema.
Basically, the schema has to match to what we want to output. If we want to output only the average in the above example, we could have:
schema <- structType(structField("avg", "double")),
what really matters is the data-type - it has to be double in above example, it cannot be string or character .... unless otherwise we explicitly convert it into e.g. string in the R function. The name doesn't matter either. I could have "hello", instead "avg'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have in the documentation smth like:
"The schema has to correspond to output SparkDataFrame. It has to be defined for each output column with preferred output column name and corresponding data type."
How does this sound ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah thats fine. Also in the example below where we construct schema you can add a comment line which looks like Here our output contains 2 columns, the key which is a integer and the mean which is a double.
|
@NarineK Thanks again for the updates to this PR and thanks @sun-rui for reviewing. The code changes LGTM -- the refactoring of worker.R is especially useful for readability. I just had a couple of minor questions on the API, examples. Also, since we are close to RC1, my vote would be to merge this PR right now and continue making any updates to examples / docs in follow up PRs. @NarineK Would you be able to update the programming guide for gapply ? #13660 is doing it for |
|
@shivaram, LGTM |
|
Test build #60621 has finished for PR 12836 at commit
|
|
Merging this to master and branch-2.0 |
## What changes were proposed in this pull request? gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API. Please, let me know what do you think and if you have any ideas to improve it. Thank you! ## How was this patch tested? Unit tests. 1. Primitive test with different column types 2. Add a boolean column 3. Compute average by a group Author: Narine Kokhlikyan <[email protected]> Author: NarineK <[email protected]> Closes #12836 from NarineK/gapply2. (cherry picked from commit 7c6c692) Signed-off-by: Shivaram Venkataraman <[email protected]>
|
Hi @vectorijk, |
|
@NarineK Cool~ I think it is better to open a separate PR to track |
|
@vectorijk, should I do the pull request for the same jira - https://issues.apache.org/jira/browse/SPARK-15672, or should I create a new jira for gapply's programming guide? |
|
@NarineK I am not quite sure. Maybe you could create a new JIRA for gapply's programming guide. |
|
Thanks for the quick response. I'll create one. |
|
@shivaram, @sun-rui , I was wondering if someone created a jira for the issue described here: |
|
@NarineK Not as far as I know |
|
no, go ahead to submit one:) |
What changes were proposed in this pull request?
gapply() applies an R function on groups grouped by one or more columns of a DataFrame, and returns a DataFrame. It is like GroupedDataSet.flatMapGroups() in the Dataset API.
Please, let me know what do you think and if you have any ideas to improve it.
Thank you!
How was this patch tested?
Unit tests.