-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26830][SQL][R] Vectorized R dapply() implementation #23787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
FWIW, my todos on my current plate after this and #23760 are:
These are correctly blocked by both this and #23760 mostly to avoid conflict hell.. I filed JIRA under https://issues.apache.org/jira/browse/SPARK-26759 |
|
cc @cloud-fan too. |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
22ca7e3 to
50f6b65
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
Test build #102477 has finished for PR 23787 at commit
|
|
I am sure this is ready for a review. Many codes are similar with both Scalar vectorized UDF and vectorized gapply(). Should be quite safe to go. |
|
gentle ping |
felixcheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the R side
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this @HyukjinKwon ! Looks pretty good from what I can tell. Possibly adding a few more tests here or in a followup might be good for some edge cases.
sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
Show resolved
Hide resolved
|
Test build #102719 has finished for PR 23787 at commit
|
|
retest this please |
|
Test build #102721 has finished for PR 23787 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
Show resolved
Hide resolved
|
Test build #102730 has finished for PR 23787 at commit
|
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my end
|
retest this please |
|
Test build #102808 has finished for PR 23787 at commit
|
|
Merged to master. I am going to resolve the followups one by one. |
|
Thanks, @felixcheung and @BryanCutler |
This PR targets to add vectorized `dapply()` in R, Arrow optimization.
This can be tested as below:
```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```
```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```
- R 3.5.x
- Arrow package 0.12+
```bash
Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
```
**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
**Shall**
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```
**R code**
```r
rdf <- read.csv("500000.csv")
df <- cache(createDataFrame(rdf))
count(df)
test <- function() {
options(digits.secs = 6) # milliseconds
start.time <- Sys.time()
count(cache(dapply(df, function(rdf) { rdf }, schema(df))))
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
}
test()
```
**Data (350 MB):**
```r
object.size(read.csv("500000.csv"))
350379504 bytes
```
"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
**Results**
```
Time difference of 13.42037 mins
```
```
Time difference of 30.64156 secs
```
The performance improvement was around **2627%**.
- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.
Unit tests were added, and manually tested.
Closes apache#23787 from HyukjinKwon/SPARK-26830-1.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
## What changes were proposed in this pull request?
This PR targets to add vectorized `dapply()` in R, Arrow optimization.
This can be tested as below:
```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```
```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```
### Requirements
- R 3.5.x
- Arrow package 0.12+
```bash
Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
```
**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
### Benchmarks
**Shall**
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```
**R code**
```r
rdf <- read.csv("500000.csv")
df <- cache(createDataFrame(rdf))
count(df)
test <- function() {
options(digits.secs = 6) # milliseconds
start.time <- Sys.time()
count(cache(dapply(df, function(rdf) { rdf }, schema(df))))
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
}
test()
```
**Data (350 MB):**
```r
object.size(read.csv("500000.csv"))
350379504 bytes
```
"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
**Results**
```
Time difference of 13.42037 mins
```
```
Time difference of 30.64156 secs
```
The performance improvement was around **2627%**.
### Limitations
- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.
## How was this patch tested?
Unit tests were added, and manually tested.
Closes apache#23787 from HyukjinKwon/SPARK-26830-1.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
This PR targets to add vectorized `dapply()` in R, Arrow optimization.
This can be tested as below:
```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```
```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```
- R 3.5.x
- Arrow package 0.12+
```bash
Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
```
**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
**Shall**
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```
**R code**
```r
rdf <- read.csv("500000.csv")
df <- cache(createDataFrame(rdf))
count(df)
test <- function() {
options(digits.secs = 6) # milliseconds
start.time <- Sys.time()
count(cache(dapply(df, function(rdf) { rdf }, schema(df))))
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
}
test()
```
**Data (350 MB):**
```r
object.size(read.csv("500000.csv"))
350379504 bytes
```
"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
**Results**
```
Time difference of 13.42037 mins
```
```
Time difference of 30.64156 secs
```
The performance improvement was around **2627%**.
- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.
Unit tests were added, and manually tested.
Closes apache#23787 from HyukjinKwon/SPARK-26830-1.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
This PR targets to add vectorized `dapply()` in R, Arrow optimization.
This can be tested as below:
```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```
```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```
- R 3.5.x
- Arrow package 0.12+
```bash
Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
```
**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
**Shall**
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4g
```
```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4g
```
**R code**
```r
rdf <- read.csv("500000.csv")
df <- cache(createDataFrame(rdf))
count(df)
test <- function() {
options(digits.secs = 6) # milliseconds
start.time <- Sys.time()
count(cache(dapply(df, function(rdf) { rdf }, schema(df))))
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
}
test()
```
**Data (350 MB):**
```r
object.size(read.csv("500000.csv"))
350379504 bytes
```
"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
**Results**
```
Time difference of 13.42037 mins
```
```
Time difference of 30.64156 secs
```
The performance improvement was around **2627%**.
- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.
Unit tests were added, and manually tested.
Closes apache#23787 from HyukjinKwon/SPARK-26830-1.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR targets to add vectorized
dapply()in R, Arrow optimization.This can be tested as below:
Requirements
Rscript -e 'remotes::install_github("apache/[email protected]", subdir = "r")'Note: currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
Note: currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
Benchmarks
Shall
sync && sudo purge ./bin/sparkR --conf spark.sql.execution.arrow.enabled=false --driver-memory 4gsync && sudo purge ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true --driver-memory 4gR code
Data (350 MB):
"500000 Records" http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
Results
The performance improvement was around 2627%.
Limitations
For now, Arrow optimization with R does not support when the data is
raw, and when user explicitly gives float type in the schema. They produce corrupt values.Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.
How was this patch tested?
Unit tests were added, and manually tested.