Skip to content

Conversation

@clarkfitzg
Copy link
Contributor

What changes were proposed in this pull request?

Fixed bug in dapplyCollect by changing the compute function of worker.R to explicitly handle raw (binary) vectors.

cc @shivaram

How was this patch tested?

Unit tests

@shivaram
Copy link
Contributor

Jenkins, ok to test

@shivaram
Copy link
Contributor

Thanks @clarkfitzg -- I'll take a look at this tomorrow

@clarkfitzg
Copy link
Contributor Author

My pleasure. Let me know if / when I should squash these commits or rebase.

Working on some before and after benchmarks now.

createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
sparkSession <- getSparkSession()

# Convert dataframes into a list of rows. Each row is a list
Copy link
Contributor

@sun-rui sun-rui Aug 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about " If the data is a dataframe, convert it into ..."?

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64335 has finished for PR 14783 at commit 5871257.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64337 has finished for PR 14783 at commit 84ef4cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@clarkfitzg
Copy link
Contributor Author

This change doesn't appear to make any difference in speed.

# Wed Aug 24 14:12:12 KST 2016
# Benchmarking performance before and after dapplyCollect patch

# Downloaded data here:
# https://s3-us-west-2.amazonaws.com/sparkr-data/nycflights13.csv

library(microbenchmark)

sparkR.session()

df <- read.csv("~/data/nycflights13.csv")

sdf <- createDataFrame(df)

# BEFORE: 7.27 seconds
# AFTER: 7.20 seconds
# The patch shouldn't change this at all
microbenchmark({sdf <- createDataFrame(df)}, times=1)

# BEFORE: 502 seconds
# AFTER: 508 seconds
microbenchmark({
    df2 <- dapplyCollect(sdf, function(x) x)
}, times=1)

@clarkfitzg
Copy link
Contributor Author

Not sure why these timings are so bad. Found out today that by using bytes and calling directly into Java's org.apache.spark.api.r.RRDD these can be improved by 2 orders of magnitude.

@clarkfitzg
Copy link
Contributor Author

Not completely sure though. I'll look into these timings a little further on Saturday to make sure I'm making a fair comparison.

@clarkfitzg
Copy link
Contributor Author

Tried some more benchmarks today. Didn't see any difference in speed before / after patch. Observing the processes as they run I see the vast majority of time spent in the local R process, while just a couple seconds in the actual parallel evaluation of the functions.

image

@clarkfitzg
Copy link
Contributor Author

@shivaram what do you think?

@sun-rui
Copy link
Contributor

sun-rui commented Aug 31, 2016

@clarkfitzg, your patch is for bug fix but not for performance improvement, right? If so, since there is no performance regression according to your benchmark, let's focus on the functionality. We can address performance issue in other JIRA issues.

@clarkfitzg
Copy link
Contributor Author

Yes, this is only for a bug fix. @shivaram mentioned in a previous email exchange it would be good to see some performance benchmarks as well.

@felixcheung
Copy link
Member

felixcheung commented Aug 31, 2016

should we have a test against DataFrame with binary column?
or, this test_that("dapplyCollect() on dataframe with list columns" should say bytes column or binary column?

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64712 has finished for PR 14783 at commit 0c2a215.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64737 has finished for PR 14783 at commit 77fa9b4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented Sep 1, 2016

Sorry I think this was a break that I just fixed in #14904

Jenkins, retest this please.

@shivaram
Copy link
Contributor

shivaram commented Sep 1, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 1, 2016

Test build #64756 has finished for PR 14783 at commit 77fa9b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

LGTM

@shivaram
Copy link
Contributor

shivaram commented Sep 1, 2016

@sun-rui Any other comments ?

@clarkfitzg
Copy link
Contributor Author

I'm presenting something related to this on Thursday- it would be nice to tell the audience this patch made it in. Can I do anything to help this along?

row1 <- inputData[[1]]
rawcolumns <- ("raw" == sapply(row1, class))

listmatrix <- do.call(rbind, inputData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what happens if we have a mixed set of columns here ? i.e. say one column with "raw", one with "integer" and one with "character" -- From reading some docs it looks like everything is converted to create a character matrix when we use rbind.

I think we have two choices if thats the case
(a) we apply the type conversions after rbind
(b) we only call this method when all columns are raw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> b = serialize(1:10, NULL)
> inputData = list(list(1L, b, 'a'), list(2L, b, 'b'))  # Mixed data types
> listmatrix <- do.call(rbind, inputData)
> listmatrix
     [,1] [,2]   [,3]
[1,] 1    Raw,62 "a"
[2,] 2    Raw,62 "b"
> class(listmatrix)
[1] "matrix"
> typeof(listmatrix)
[1] "list"
> is.character(listmatrix)
[1] FALSE

A little unusual- it's a list matrix. Hence the name. Which docs are you referring to?

The test that's in here now does test for mixed columns, but it doesn't test for a single column of raws. I'll add that now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at https://stat.ethz.ch/R-manual/R-devel/library/base/html/cbind.html specifically the section Value which says

The type of a matrix result determined from the highest type of any of the inputs in the hierarchy raw < logical < integer < double < complex < character < list .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct class is maintained:

> sapply(listmatrix, class)
[1] "integer"   "integer"   "raw"       "raw"       "character" "character"
> sapply(listmatrix, typeof)
[1] "integer"   "integer"   "raw"       "raw"       "character" "character"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - the types are inside the listmatrix. Thanks @clarkfitzg for clarifying. Let us know once you have added the test for a single column of raw as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since everything in in inputData is a list this goes straight to the top of hierarchy- same as if you called rbind(list1, list2, ...).

@shivaram
Copy link
Contributor

shivaram commented Sep 7, 2016

Sorry for the delay @clarkfitzg - The code change looks pretty good to me. I just had one question about mixed type columns.

# Single binary column
input <- list(list(r1), list(r2), list(r3))
expected <- subset(expected, select = "V2")
result <- setNames(rbindRaws(input), "V2")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaram Here's the new test. I made the other ones a bit more general also.

@SparkQA
Copy link

SparkQA commented Sep 7, 2016

Test build #65027 has finished for PR 14783 at commit 91d69be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

shivaram commented Sep 7, 2016

Thanks for the update. LGTM. Merging this to master and branch-2.0

@asfgit asfgit closed this in 9fccde4 Sep 7, 2016
asfgit pushed a commit that referenced this pull request Sep 7, 2016
Fixed bug in `dapplyCollect` by changing the `compute` function of `worker.R` to explicitly handle raw (binary) vectors.

cc shivaram

Unit tests

Author: Clark Fitzgerald <[email protected]>

Closes #14783 from clarkfitzg/SPARK-16785.

(cherry picked from commit 9fccde4)
Signed-off-by: Shivaram Venkataraman <[email protected]>
@clarkfitzg
Copy link
Contributor Author

Thanks!

@catlain
Copy link

catlain commented Jun 2, 2017

still have this issue when input data is an array column not having the same length on each vector, like:

head(test1)

               key              value
1 4dda7d68a202e9e3              1595297780
2  4e08f349deb7392              641991337
3 4e105531747ee00b              374773009
4 4f1d5ef7fdb4620a              2570136926
5 4f63a71e6dde04cd              2117602722
6 4fa2f96b689624fc              3489692062, 1344510747, 1095592237, 424510360, 3211239587

sparkR.stop()
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
spark_df = createDataFrame(sqlContext, test1)

# Fails
dapplyCollect(spark_df, function(x) x)

Caused by: org.apache.spark.SparkException: R computation failed with
 Error in (function (..., deparse.level = 1, make.row.names = TRUE, stringsAsFactors = default.stringsAsFactors())  : 
  invalid list argument: all variables should have the same length
	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
	at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
	at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:186)
	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:183)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

# Works fine
spark_df <- selectExpr(spark_df, "key", "explode(value) value") 
dapplyCollect(spark_df, function(x) x)

                key         value
1  4dda7d68a202e9e3 1595297780
2   4e08f349deb7392  641991337
3  4e105531747ee00b  374773009
4  4f1d5ef7fdb4620a 2570136926
5  4f63a71e6dde04cd 2117602722
6  4fa2f96b689624fc 3489692062
7  4fa2f96b689624fc 1344510747
8  4fa2f96b689624fc 1095592237
9  4fa2f96b689624fc  424510360
10 4fa2f96b689624fc 3211239587

@felixcheung
Copy link
Member

@catlain could you please open a JIRA.
like this, set component to SparkR https://issues.apache.org/jira/browse/SPARK-21068?filter=12333531

@catlain
Copy link

catlain commented Jun 13, 2017

done
jira

@clarkfitzg
Copy link
Contributor Author

This patch only handled the raw columns, not the vector / array value columns. So maybe that original JIRA should still be open, or create another one specific to this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants