-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-SQL] HiveTableScan operator Performance Improvement #456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Jenkins, this is ok to test |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14261/ |
|
Mind fixing the tab characters so we can test this? |
|
sure, I am looking at right now. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14295/ |
|
BTW, you can check style locally |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14300/ |
|
The problem was I used vim for coding and it screwed up the tabbing for some reason. I did the sbt scalastyle and it succeed now locally |
|
I am a little bit confused why it does not complain in my machine locally but it produces errors here ... |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14304/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in here
|
Build triggered. |
|
Build started. |
|
Build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14496/ |
|
Build triggered. |
|
Build started. |
|
Build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14497/ |
## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain) Not filed in upstream, touches code for conda. ## What changes were proposed in this pull request? rLibDir contains a sequence of possible paths for the SparkR package on the executor and is passed on to the R daemon with the SPARKR_RLIBDIR environment variable. This PR filters rLibDir for paths that exist before setting SPARKR_RLIBDIR, retaining existing functionality to preferentially choose a YARN or local SparkR install over conda if both are present. See daemon.R: https://github.com/palantir/spark/blob/master/R/pkg/inst/worker/daemon.R#L23 Fixes apache#456 ## How was this patch tested? Manually testing cherry picked on older version Please review http://spark.apache.org/contributing.html before opening a pull request.
It will add two periodic jobs of integration test of helm with native k8s cluster, which use v2.2.0 chart-testing tool: 1. helm with 2.12.2 and kubernetes with v1.12.7 2. helm with 2.12.2 and kubernetes with v1.13.4 Closes: theopenlab/openlab#212 Closes: theopenlab/openlab#213
…e functions into projections
### What changes were proposed in this pull request?
This PR filters out `ExtractValues`s that contains any aggregation function in the `NestedColumnAliasing` rule to prevent cases where aggregations are pushed down into projections.
### Why are the changes needed?
To handle a corner/missed case in `NestedColumnAliasing` that can cause users to encounter a runtime exception.
Consider the following schema:
```
root
|-- a: struct (nullable = true)
| |-- c: struct (nullable = true)
| | |-- e: string (nullable = true)
| |-- d: integer (nullable = true)
|-- b: string (nullable = true)
```
and the query:
`SELECT MAX(a).c.e FROM (SELECT a, b FROM test_aggregates) GROUP BY b`
Executing the query before this PR will result in the error:
```
java.lang.UnsupportedOperationException: Cannot generate code for expression: max(input[0, struct<c:struct<e:string>,d:int>, true])
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotGenerateCodeForExpressionError(QueryExecutionErrors.scala:83)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:312)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:311)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:99)
...
```
The optimised plan before this PR is:
```
'Aggregate [b#1], [_extract_e#5 AS max(a).c.e#3]
+- 'Project [max(a#0).c.e AS _extract_e#5, b#1]
+- Relation default.test_aggregates[a#0,b#1] parquet
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
A new unit test in `NestedColumnAliasingSuite`. The test consists of the repro mentioned earlier.
The produced optimized plan is checked for equivalency with a plan of the form:
```
Aggregate [b#452], [max(a#451).c.e AS max('a)[c][e]#456]
+- LocalRelation <empty>, [a#451, b#452]
```
Closes #33921 from vicennial/spark-36677.
Authored-by: Venkata Sai Akhil Gudesa <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
…e functions into projections
### What changes were proposed in this pull request?
This PR filters out `ExtractValues`s that contains any aggregation function in the `NestedColumnAliasing` rule to prevent cases where aggregations are pushed down into projections.
### Why are the changes needed?
To handle a corner/missed case in `NestedColumnAliasing` that can cause users to encounter a runtime exception.
Consider the following schema:
```
root
|-- a: struct (nullable = true)
| |-- c: struct (nullable = true)
| | |-- e: string (nullable = true)
| |-- d: integer (nullable = true)
|-- b: string (nullable = true)
```
and the query:
`SELECT MAX(a).c.e FROM (SELECT a, b FROM test_aggregates) GROUP BY b`
Executing the query before this PR will result in the error:
```
java.lang.UnsupportedOperationException: Cannot generate code for expression: max(input[0, struct<c:struct<e:string>,d:int>, true])
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotGenerateCodeForExpressionError(QueryExecutionErrors.scala:83)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:312)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:311)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:99)
...
```
The optimised plan before this PR is:
```
'Aggregate [b#1], [_extract_e#5 AS max(a).c.e#3]
+- 'Project [max(a#0).c.e AS _extract_e#5, b#1]
+- Relation default.test_aggregates[a#0,b#1] parquet
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
A new unit test in `NestedColumnAliasingSuite`. The test consists of the repro mentioned earlier.
The produced optimized plan is checked for equivalency with a plan of the form:
```
Aggregate [b#452], [max(a#451).c.e AS max('a)[c][e]#456]
+- LocalRelation <empty>, [a#451, b#452]
```
Closes #33921 from vicennial/spark-36677.
Authored-by: Venkata Sai Akhil Gudesa <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
(cherry picked from commit 2ed6e7b)
Signed-off-by: Liang-Chi Hsieh <[email protected]>
… scale < 0 (apache#456)" (apache#467) This reverts commit 0017da5
…AtomicCreateTableAsSelectExec (apache#456)
The goal is to improve the performance of the HiveTableScan Operator:
As a quick benchmark run the following code in the scala interpreter:
scala> :paste
hql("CREATE TABLE IF NOT EXISTS sample (key1 INT, key2 INT,value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/sample2.txt' INTO TABLE sample")
println("Result of SELECT * FROM sample:")
val start = System.nanoTime
val recs = hql("FROM sample SELECT key1,key2,value").collect()
val micros = (System.nanoTime - start) / 1000
println("%d microsecondss".format(micros))
scala> CTRL-D
you can download the test file from here:
http://homes.cs.washington.edu/~soroush/sample2.txt
"sample2.txt contains about 3.6 million rows. The improved code scans the entire table in about 9 seconds while the original code scans the entire table in about 22 seconds.
Regarding the last item in the task:
"Avoid Reading Unneeded Data - Some Hive Serializer/Deserializer (SerDe) interfaces support reading only the required columns from the underlying HDFS files. We should use ColumnProjectionUtils to configure these correctly."
The way to do it, should be similar to the following code:
https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/TableScanOperator.scala
I tried to take a similar approach, but I am not sure columnar reading is working at hiveOperators.scala right now. Anyway, it requires more time for me to make sure that last feature is working. Please notice that it was the first time that I wrote code in scala and it took me some time to get comfortable with the language.