Skip to content

Conversation

@sun-rui
Copy link
Contributor

@sun-rui sun-rui commented Apr 19, 2016

What changes were proposed in this pull request?

dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame.

The function signature is:

dapply(df, function(localDF) {}, schema = NULL)

R function input: local data.frame from the partition on local node
R function output: local data.frame

Schema specifies the Row format of the resulting DataFrame. It must match the R function's output.
If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply().

How was this patch tested?

SparkR unit tests.

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56206 has finished for PR 12493 at commit 00a8c1c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 19, 2016

@rxin, @davies, @NarineK, @shivaram, please help to review it so that it can catch spark 2.0

@SparkQA
Copy link

SparkQA commented Apr 19, 2016

Test build #56209 has finished for PR 12493 at commit e6b67b0.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

#' @family DataFrame functions
#' @rdname dapply
#' @name dapply
#' @export
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add doc example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@shivaram
Copy link
Contributor

Thanks @sun-rui for the change. I did a first pass over it. It would be good to add some more test cases for where the schema is not specified as well.

Also I think we need somebody from the SQL side to look at this (cc @rxin @davies)

@SparkQA
Copy link

SparkQA commented Apr 20, 2016

Test build #56320 has finished for PR 12493 at commit 480dec9.

  • This patch fails some tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 20, 2016

@shivaram, there is already a test case for for where the schema is not specified. Do you mean adding more?

@rxin
Copy link
Contributor

rxin commented Apr 20, 2016

@davies should take a detailed look at this.

This looks pretty good based on my very very quick glance.

* A function wrapper that applies the given R function to each partition.
*/
private[sql] case class MapPartitionsRWrapper(
func: Array[Byte],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@rxin
Copy link
Contributor

rxin commented Apr 20, 2016

BTW one observation: FWIW, I think the dapplyCollect method will be a lot more useful, because that's the one that can be used for training models, etc.

@SparkQA
Copy link

SparkQA commented Apr 20, 2016

Test build #56322 has finished for PR 12493 at commit 80da663.

  • This patch fails SparkR unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 20, 2016

The test failure is weird:

1. Failure (at test_sparkSQL.R#1973): dapply() on a DataFrame ------------------
expected is not identical to result. Differences: 
Attributes: < Component "row.names": Numeric: lengths (33, 32) differ >
Component "mpg": Numeric: lengths (33, 32) differ

The unit tests passed in my machine. anyone has idea?

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 20, 2016

@rxin, I will implement dapplyCollect and collect() on DataFrame of serialized R data in a following PR.

@shivaram
Copy link
Contributor

@sun-rui Regarding the unit tests could it be related to the R version or the version of testthat we are using on Jenkins ?

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 21, 2016

When I rebased this PR to master, I found a bug in Catalyst optimizer. I submitted a PR for it #12575. I have to wait for it to be fixed.

#' @examples
#' \dontrun{
#' df <- createDataFrame (sqlContext, mtcars)
#' df1 <- dapply(df, function(x) { x }, schema(df))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have an more elaborate example to explain how func should expect or handle "each partition of the DataFrame"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56674 has finished for PR 12493 at commit 76a6fd7.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56684 has finished for PR 12493 at commit 481df69.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2016

Test build #56663 has finished for PR 12493 at commit 75dae85.

  • This patch fails SparkR unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 27, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57110 has finished for PR 12493 at commit b39466c.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

@sun-rui I poked around this a little bit more today. It looks like what is happening is that somehow we are creating factor type objects when we have strings in our dataframe. I think the problem is in the line
data <- do.call(rbind.data.frame, c(data, stringsAsFactors = FALSE)) in worker.R -- I am not sure the stringsAsFactors = F is being passed correctly.

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

@shivaram, is the R version on Jenkins 3.1.1? seems I need to test with it

@shivaram
Copy link
Contributor

Yeah the version on Jenkins is R version 3.1.1 (2014-07-10) and on my laptop is R version 3.2.1 (2015-06-18). I can see the error on my laptop as well

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

I am using R 3.2.4. I just re-ran the test again with success. OK let me try some old versions

@shivaram
Copy link
Contributor

Aha - I think the option didn't exist before. From https://cran.r-project.org/src/base/NEWS

CHANGES IN R 3.2.4:
....
    The data.frame method of rbind() gains an optional argument
      stringsAsFactors (instead of only depending on
      getOption("stringsAsFactors")).
....

@shivaram
Copy link
Contributor

I think the best workaround is to do something like

oldOpt <- getOption("stringsAsFactors")
options(stringsAsFactors=FALSE)
do.call(rbind.data.frame(data))
options(stringsAsFactors=oldOpt)

i.e. set the global option before calling rbind and then reset it to the previous value

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

I am not sure if it is necessary to add "stringsAsFactors" as FALSE. just add for safety. Remove it for now?

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

and add a comment for future revisit

@shivaram
Copy link
Contributor

Yeah adding a comment to revisit in future sounds good.

@shivaram
Copy link
Contributor

FWIW I tried the 4 lines I wrote above and it works on my machine. The code in worker.R looks something like

...
+    if (isDataFrame) {
+      if (deserializer == "row") {
+        # Transform the list of rows into a data.frame
+        oldOpt <- getOption("stringsAsFactors")
+        options(stringsAsFactors = FALSE)
+        data <- do.call(rbind.data.frame, data)
+        options(stringsAsFactors = oldOpt)
+        names(data) <- colNames
+      } else {
...

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

Yes, I tried, "stringsAsFactors" must be FALSE, as our SerDe does not support factor for now
so I am changing the code as your proposal

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 28, 2016

@shivaram, changed the code. let's wait for the testing result:)

@shivaram
Copy link
Contributor

Cool R code LGTM. @davies / @rxin If one of you can take a final pass at the SQL changes this should be good to do.

@SparkQA
Copy link

SparkQA commented Apr 28, 2016

Test build #57201 has finished for PR 12493 at commit 2264b57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Returns a new [[DataFrame]] that contains the result of applying a serialized R function
* `func` to each partition.
*
* @group func
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add @SInCE attribute in the comment ?

Copy link
Contributor Author

@sun-rui sun-rui Apr 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 2.0 is a good chance for add "since" for SparkR API methods. But I think we can do it consistently for all methods at one. I will submit a new JIRA issue for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies
Copy link
Contributor

davies commented Apr 28, 2016

LGTM over all. There are still a few of the change that are not needed by this PR (for example, SERIALIZED_R_DATA_SCHEMA), are these kept for future?

@sun-rui
Copy link
Contributor Author

sun-rui commented Apr 29, 2016

@davies, yes, those changes are deliberately kept for future PRs, like applyCollect()

@SparkQA
Copy link

SparkQA commented Apr 29, 2016

Test build #57296 has finished for PR 12493 at commit 3efe9f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Apr 29, 2016

Test build #57312 has finished for PR 12493 at commit 3efe9f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

Merging this to master

@asfgit asfgit closed this in 4ae9fe0 Apr 29, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants