Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Oct 18, 2021

What changes were proposed in this pull request?

Unify the compare function of UTF8String and ByteArray.

Why are the changes needed?

BinaryType use TypeUtils.compareBinary to compare two byte array, however it's slow since it compares byte array using unsigned int comparison byte by bye.

We can compare them using Platform.getLong with unsigned long comparison if they have more than 8 bytes. And here is some histroy about this TODO https://github.com/apache/spark/pull/6755/files#r32197461

The benchmark result should be same with UTF8String, can be found in #19180 (#19180 (comment))

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Move test from TypeUtilsSuite to ByteArraySuite
  • Benchmark result

JDK8:

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            636            661          14        103.0           9.7       1.0X
8-16 byte                                          1067           1112          21         61.4          16.3       0.6X
16-32 byte                                         1226           1352          98         53.4          18.7       0.5X
512-1024 byte                                      1803           1916          46         36.3          27.5       0.4X
512 byte slow                                      4343           4662         171         15.1          66.3       0.1X
2-7 byte                                           1075           1119          26         61.0          16.4       0.6X

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1511           1570          30         43.4          23.1       1.0X
8-16 byte                                          1522           1564          27         43.1          23.2       1.0X
16-32 byte                                         1426           1554          36         46.0          21.8       1.1X
512-1024 byte                                      2080           2198          86         31.5          31.7       0.7X
512 byte slow                                     28498          29222         410          2.3         434.9       0.1X
2-7 byte                                           1382           1485          61         47.4          21.1       1.1X

JDK11

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            720            777          21         91.0          11.0       1.0X
8-16 byte                                          1077           1138          32         60.8          16.4       0.7X
16-32 byte                                         1347           1463          84         48.7          20.5       0.5X
512-1024 byte                                      1898           1989          40         34.5          29.0       0.4X
512 byte slow                                      4621           4878         168         14.2          70.5       0.2X
2-7 byte                                           1062           1133          28         61.7          16.2       0.7X

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1377           1471          37         47.6          21.0       1.0X
8-16 byte                                          1398           1475          38         46.9          21.3       1.0X
16-32 byte                                         1452           1547          47         45.2          22.1       0.9X
512-1024 byte                                      1826           1953          55         35.9          27.9       0.8X
512 byte slow                                     45883          47146         NaN          1.4         700.1       0.0X
2-7 byte                                           1401           1484          39         46.8          21.4       1.0X

@github-actions github-actions bot added the SQL label Oct 18, 2021
@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48830/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48830/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144352 has finished for PR 34310 at commit 65a8e64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48837/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48837/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144362 has finished for PR 34310 at commit a0b1c56.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

retest this please

@ulysses-you
Copy link
Contributor Author

cc @xkrogen @srowen @kiszk @cloud-fan @JoshRosen if you have time to take another look

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48842/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48842/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144367 has finished for PR 34310 at commit a0b1c56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

public static int compareBinary(byte[] leftBase, byte[] rightBase) {
return compareBinary(leftBase, Platform.BYTE_ARRAY_OFFSET, leftBase.length,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only wondering if this ends up being slower - you already have byte arrays, and now have to go through platform methods to read them?

Copy link
Contributor

@JoshRosen JoshRosen Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

It seems plausible that the new version will be faster, but it's probably a good idea to run a quick benchmark to confirm. There's a UTF8StringBenchmark linked from #19180 (comment) : maybe we could adapt that to work on byte arrays and do a quick before-and-after comparison to just to double check?

Edit: just to clarify: I noticed that this benchmark is also linked in the PR description. As Sean points out, I think the key difference in this PR is whether we're using getByte() versus directly accessing the on-heap byte array (in the linked UTF8String benchmark, both the old and new code were using getByte()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @srowen and @JoshRosen for point out the difference. I follow the linked benchmark but add a new 512 byte slow benchmark which the first 511 bytes are same. The benchmark result shows it has no regression after this PR and has big benifits if the byte arrays have many same prefix.

Before this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_271-b09 on Mac OS X 10.16
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            800            861          70         81.9          12.2       1.0X
8-16 byte                                           810            878          59         80.9          12.4       1.0X
16-32 byte                                          804            887          40         81.5          12.3       1.0X
512-1024 byte                                      1050           1181          43         62.4          16.0       0.8X
512 byte slow                                     23593          23698         311          2.8         360.0       0.0X
2-7 byte                                            778            784           5         84.2          11.9       1.0X

After this PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_271-b09 on Mac OS X 10.16
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            425            471          24        154.2           6.5       1.0X
8-16 byte                                           751            814          40         87.2          11.5       0.5X
16-32 byte                                          789            842          42         83.1          12.0       0.5X
512-1024 byte                                      1038           1175         193         63.1          15.8       0.4X
512 byte slow                                      3419           3924         NaN         19.2          52.2       0.1X
2-7 byte                                            421            424           2        155.6           6.4       1.0X

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48868/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48868/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Test build #144394 has finished for PR 34310 at commit 15b8efe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

benchmark.addCase("16-32 byte")(compareBinary(dataMedium))
benchmark.addCase("512-1024 byte")(compareBinary(dataLarge))
benchmark.addCase("512 byte slow")(compareBinary(dataLargeSlow))
benchmark.addCase("2-7 byte")(compareBinary(dataTiny))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this this case is listed twice. Maybe drop this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first benchmark case may run slower than the latter due to the JIT optimization and this case has small size which can be done in a short time that would be more likely affected.

So I also keep it running twice in case this issue.

byte array comparisons
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 11.0.12+8-LTS-237 on Mac OS X 11.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you run the benchmarks in GitHub Actions according to the instructions at https://spark.apache.org/developer-tools.html#github-workflow-benchmarks and then include those results in this PR in place of these ones? This helps to ensure that checked-in benchmark results come from a consistent environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @JoshRosen , it's really a good tool for me. Updated the benchmark result from GA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also add the benchmark tool guide in pull request template, #34349

@SparkQA
Copy link

SparkQA commented Oct 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48955/

@SparkQA
Copy link

SparkQA commented Oct 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48955/

@SparkQA
Copy link

SparkQA commented Oct 21, 2021

Test build #144483 has finished for PR 34310 at commit f9f11d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -0,0 +1,16 @@
================================================================================================
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have 'before' numbers for these? you don't need to include them just want to verify that it also seemed to show an improvement like your local laptop one did

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the old code path benchmark result:

JDK8

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            799            836          24         82.0          12.2       1.0X
8-16 byte                                           832            906          32         78.8          12.7       1.0X
16-32 byte                                          812            854          28         80.7          12.4       1.0X
512-1024 byte                                      1057           1088          20         62.0          16.1       0.8X
512 byte slow                                     24628          26054         NaN          2.7         375.8       0.0X
2-7 byte                                            811            849          23         80.8          12.4       1.0X

JDK11

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compareTo:                     Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            771            812          28         85.0          11.8       1.0X
8-16 byte                                           839            857          13         78.1          12.8       0.9X
16-32 byte                                          898            926          17         73.0          13.7       0.9X
512-1024 byte                                      1141           1189          23         57.4          17.4       0.7X
512 byte slow                                     40124          40689         495          1.6         612.2       0.0X
2-7 byte                                            827            847          14         79.3          12.6       0.9X

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shows we still have the benefits with GA env.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I just notice the env of GA is still different. The two benchmark result based on:

Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to believe it is a win based on your first benchmark. Is there any easy way to run before/after on these Xeons, or is that hard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared the two code path within one patch, and here is the result.

JDK8:

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            636            661          14        103.0           9.7       1.0X
8-16 byte                                          1067           1112          21         61.4          16.3       0.6X
16-32 byte                                         1226           1352          98         53.4          18.7       0.5X
512-1024 byte                                      1803           1916          46         36.3          27.5       0.4X
512 byte slow                                      4343           4662         171         15.1          66.3       0.1X
2-7 byte                                           1075           1119          26         61.0          16.4       0.6X

OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1511           1570          30         43.4          23.1       1.0X
8-16 byte                                          1522           1564          27         43.1          23.2       1.0X
16-32 byte                                         1426           1554          36         46.0          21.8       1.1X
512-1024 byte                                      2080           2198          86         31.5          31.7       0.7X
512 byte slow                                     28498          29222         410          2.3         434.9       0.1X
2-7 byte                                           1382           1485          61         47.4          21.1       1.1X

JDK11

================================================================================================
byte array comparisons
================================================================================================

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare offHeap:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                            720            777          21         91.0          11.0       1.0X
8-16 byte                                          1077           1138          32         60.8          16.4       0.7X
16-32 byte                                         1347           1463          84         48.7          20.5       0.5X
512-1024 byte                                      1898           1989          40         34.5          29.0       0.4X
512 byte slow                                      4621           4878         168         14.2          70.5       0.2X
2-7 byte                                           1062           1133          28         61.7          16.2       0.7X

OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure
Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
Byte Array compare onHeap:                Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
2-7 byte                                           1377           1471          37         47.6          21.0       1.0X
8-16 byte                                          1398           1475          38         46.9          21.3       1.0X
16-32 byte                                         1452           1547          47         45.2          22.1       0.9X
512-1024 byte                                      1826           1953          55         35.9          27.9       0.8X
512 byte slow                                     45883          47146         NaN          1.4         700.1       0.0X
2-7 byte                                           1401           1484          39         46.8          21.4       1.0X

@srowen srowen closed this in 74d974a Oct 24, 2021
@srowen
Copy link
Member

srowen commented Oct 24, 2021

Merged to master

@ulysses-you ulysses-you deleted the SPARK-37037 branch October 25, 2021 01:20
@ulysses-you
Copy link
Contributor Author

thank you @srowen and @JoshRosen !

@kiszk
Copy link
Member

kiszk commented Oct 25, 2021

late LGTM

wangyum pushed a commit that referenced this pull request May 26, 2023
* [SPARK-36992][SQL] Improve byte array sort perf by unify getPrefix function of UTF8String and ByteArray

### What changes were proposed in this pull request?

Unify the getPrefix function of `UTF8String` and `ByteArray`.

### Why are the changes needed?

When execute sort operator, we first compare the prefix. However the getPrefix function of byte array is slow. We use first 8 bytes as the prefix, so at most we will call 8 times with `Platform.getByte` which is slower than call once with `Platform.getInt` or `Platform.getLong`.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

pass `org.apache.spark.util.collection.unsafe.sort.PrefixComparatorsSuite`

Closes #34267 from ulysses-you/binary-prefix.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-37037][SQL] Improve byte array sort by unify compareTo function of UTF8String and ByteArray

### What changes were proposed in this pull request?

Unify the compare function of `UTF8String` and `ByteArray`.

### Why are the changes needed?

`BinaryType` use `TypeUtils.compareBinary` to compare two byte array, however it's slow since it compares byte array using unsigned int comparison byte by bye.

We can compare them using `Platform.getLong` with unsigned long comparison if they have more than 8 bytes. And here is some histroy about this `TODO` https://github.com/apache/spark/pull/6755/files#r32197461

The benchmark result should be same with `UTF8String`, can be found in #19180 (#19180 (comment))

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Move test from `TypeUtilsSuite` to `ByteArraySuite`

Closes #34310 from ulysses-you/SPARK-37037.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-37341][SQL] Avoid unnecessary buffer and copy in full outer sort merge join

### What changes were proposed in this pull request?

FULL OUTER sort merge join (non-code-gen path) [copies join keys and buffers input rows, even when rows from both sides do not have matched keys](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L1637-L1641). This is unnecessary, as we can just output the row with smaller join keys, and only buffer when both sides have matched keys. This would save us from unnecessary copy and buffer, when both join sides have a lot of rows not matched with each other.

### Why are the changes needed?

Improve query performance for FULL OUTER sort merge join when code-gen is disabled.
This would benefit query when both sides have a lot of rows not matched, and join key is big in terms of size (e.g. string type).

Example micro benchmark:

```
  def sortMergeJoin(): Unit = {
    val N = 2 << 20
    codegenBenchmark("sort merge join", N) {
      val df1 = spark.range(N).selectExpr(s"cast(id * 15485863 as string) as k1")
      val df2 = spark.range(N).selectExpr(s"cast(id * 15485867 as string) as k2")
      val df = df1.join(df2, col("k1") === col("k2"), "full_outer")
      assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
      df.noop()
    }
  }
```

Seeing run-time improvement over 60%:

```
Running benchmark: sort merge join
  Running case: sort merge join without optimization
  Stopped after 5 iterations, 10026 ms
  Running case: sort merge join with optimization
  Stopped after 5 iterations, 5954 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.16
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
sort merge join:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
sort merge join without optimization               1807           2005         157          1.2         861.4       1.0X
sort merge join with optimization                  1135           1191          62          1.8         541.1       1.6X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests e.g. `OuterJoinSuite.scala`.

Closes #34612 from c21/smj-fix.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-37447][SQL] Cache LogicalPlan.isStreaming() result in a lazy val

### What changes were proposed in this pull request?

This PR adds caching to `LogicalPlan.isStreaming()`: the default implementation's result will now be cached in a `private lazy val`.

### Why are the changes needed?

This improves the performance of the `DeduplicateRelations` analyzer rule.

The default implementation of `isStreaming` recursively visits every node in the tree. `DeduplicateRelations.renewDuplicatedRelations` is recursively invoked on every node in the tree and each invocation calls `isStreaming`. This leads to `O(n^2)` invocations of `isStreaming` on leaf nodes.

Caching `isStreaming` avoids this performance problem.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Correctness should be covered by existing tests.

This significantly improved `DeduplicateRelations` performance in local microbenchmarking with large query plans (~20% reduction in that rule's runtime in one of my tests).

Closes #34691 from JoshRosen/cache-LogicalPlan.isStreaming.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-37530][CORE] Spark reads many paths very slow though newAPIHadoopFile

### What changes were proposed in this pull request?

Same as #18441, we parallelize FileInputFormat.listStatus for newAPIHadoopFile

### Why are the changes needed?

![image](https://user-images.githubusercontent.com/8326978/144562490-d8005bf2-2052-4b50-9a5d-8b253ee598cc.png)

Spark can be slow when accessing external storage at driver side, improve perf by parallelizing

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

passing GA

Closes #34792 from yaooqinn/SPARK-37530.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>

* [SPARK-37592][SQL] Improve performance of `JoinSelection`

When I reading the implement of AQE, I find the process select join with hint exists a lot cumbersome code.

The join hint has a relatively high learning curve for users, so the SQL not  contains join hint in more cases.

Improve performance of `JoinSelection`

'No'.
Just change the inner implement.

Jenkins test.

Closes #34844 from beliefer/SPARK-37592-new.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-37646][SQL] Avoid touching Scala reflection APIs in the lit function

### What changes were proposed in this pull request?

This PR proposes to avoid touching Scala reflection APIs in the lit function.

### Why are the changes needed?

Currently `lit` calls `typedlit[Any]` and touches Scala reflection APIs unnecessarily. As Scala reflection APIs touch multiple global locks and they are pretty slow when the parallelism is pretty high.

This PR inlines `typedlit` to `lit` and replaces `Literal.create` with `Literal.apply` to avoid touching Scala reflection APIs. There is no behavior change.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- New unit tests.
- Manually ran the test in https://issues.apache.org/jira/browse/SPARK-37646 and saw no difference between `new Column(Literal(0L))` and `lit(0L)`.

Closes #34901 from zsxwing/SPARK-37646.

Lead-authored-by: Shixiong Zhu <[email protected]>
Co-authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>

* [SPARK-37689][SQL] Expand should be supported in PropagateEmptyRelation

We  meet a case that when there is a empty relation, HashAggregateExec still triggered to execute and return an empty result. It's not necessary.
![image](https://user-images.githubusercontent.com/46485123/146725110-27496536-f1f7-4fac-ae2c-0f6f81159bba.png)
It's caused by there is an  `Expand(EmptyLocalRelation())`, and it's not propagated,  this pr support propagate `Expand` with empty LocalRelation

Avoid unnecessary execution.

No

Added UT

Closes #34954 from AngersZhuuuu/SPARK-37689.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write failed file held by DiskBlockObjectWriter

We always do file truncate operation before delete a write failed file held by `DiskBlockObjectWriter`, a typical process is as follows:

```
if (!success) {
  // This code path only happens if an exception was thrown above before we set success;
  // close our stuff and let the exception be thrown further
  writer.revertPartialWritesAndClose()
  if (file.exists()) {
    if (!file.delete()) {
      logWarning(s"Error deleting ${file}")
    }
  }
}
```
The `revertPartialWritesAndClose` method will reverts writes that haven't been committed yet,  but it doesn't seem necessary in the current scene.

So this pr add a new method  to `DiskBlockObjectWriter` named `closeAndDelete()`,  the new method just revert write metrics and delete the write failed file.

Avoid unnecessary file operations.

Add a new method  to `DiskBlockObjectWriter` named `closeAndDelete().

Pass the Jenkins or GitHub Action

Closes #33628 from LuciferYang/SPARK-36406.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: attilapiros <[email protected]>

* [SPARK-37462][CORE] Avoid unnecessary calculating the number of outstanding fetch requests and RPCS

Avoid unnecessary calculating the number of outstanding fetch requests and RPCS

It is unnecessary to calculate the number of outstanding fetch requests and RPCS when the IdleStateEvent is not IDLE or the last request is not timeout.

No.
Exist unittests.

Closes #34711 from weixiuli/SPARK-37462.

Authored-by: weixiuli <[email protected]>
Signed-off-by: Sean Owen <[email protected]>

Co-authored-by: ulysses-you <[email protected]>
Co-authored-by: Cheng Su <[email protected]>
Co-authored-by: Josh Rosen <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Co-authored-by: Jiaan Geng <[email protected]>
Co-authored-by: Shixiong Zhu <[email protected]>
Co-authored-by: Shixiong Zhu <[email protected]>
Co-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Co-authored-by: weixiuli <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants