Skip to content

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Aug 13, 2024

What changes were proposed in this pull request?

This PR aims to support separate buffer size configuration in UnsafeShuffleWriter.

Introduce spark.shuffle.file.merge.buffer configuration.

Why are the changes needed?

UnsafeShuffleWriter#mergeSpillsWithFileStream uses spark.shuffle.file.buffer as the buffer for reading spill files, and this buffer is an off-heap buffer.

In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, UnsafeShuffleWriter#mergeSpillsWithFileStream needs to create a lot of off-heap memory, which makes the executor easily killed by YARN.

for (int i = 0; i < spills.length; i++) {
spillInputStreams[i] = new NioBufferedFileInputStream(
spills[i].file,
inputBufferSizeInBytes);

Does this PR introduce any user-facing change?

No

How was this patch tested?

Production environment verification

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks reasonable.
+CC @JoshRosen as well.

As a side note, should we be using Platform.allocateDirectBuffer for NioBufferedFileInputStream as well @JoshRosen ?

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, as this looks reasonable to me as well: this is a bit of a low-level configuration but it seems fine to allow it to be tuned.


As a side note, should we be using Platform.allocateDirectBuffer for NioBufferedFileInputStream as well @JoshRosen ?

The difference between ByteBuffer.allocateDirect and Platform.allocateDirectBuffer is that the latter bypasses / ignores the JVM's -XX:MaxDirectMemorySize limit.

Given that almost all other Spark-initiated allocations use the Platform version, we probably should make that change.

That said, I also spot another ByteBuffer.allocateDirect usage at

private val buffer = ByteBuffer.allocateDirect(64 * 1024)
buffer.flip()
plus a potential need for additional StorageUtils.dispose() calls for that other call in ReadableChannelFileRegion (which is only used by EncryptedBlockData, as far as I know, though), so perhaps it would be better to update both of those in a separate PR instead of doing it here.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Aug 22, 2024

I also found another one, which uses Platform.allocateDirectBuffer when initializing, but uses ByteBuffer.allocateDirect when growing.

We can do this in another PR.

private[spark] class DirectByteBufferOutputStream(capacity: Int) extends OutputStream {
private var buffer = Platform.allocateDirectBuffer(capacity)

val newBuffer = ByteBuffer.allocateDirect(newCapacity)
newBuffer.put(oldBuffer)
StorageUtils.dispose(oldBuffer)
buffer = newBuffer

@mridulm mridulm closed this in d84f1a3 Aug 23, 2024
@mridulm
Copy link
Contributor

mridulm commented Aug 23, 2024

Merged to master.
Thanks for fixing this @cxzl25 !
Thanks for the review @JoshRosen :-)

@mridulm
Copy link
Contributor

mridulm commented Aug 23, 2024

@cxzl25, please do verify if the jira has been updated correctly - thanks !

dongjoon-hyun pushed a commit that referenced this pull request Sep 4, 2024
…yteBuffer.allocateDirect`

### What changes were proposed in this pull request?
This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

### Why are the changes needed?
#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Sep 4, 2024
…yteBuffer.allocateDirect`

This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

No

GA

No

Closes #47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2ed6c3e)
Signed-off-by: Dongjoon Hyun <[email protected]>
IvanK-db pushed a commit to IvanK-db/spark that referenced this pull request Sep 20, 2024
…afeShuffleWriter

### What changes were proposed in this pull request?
This PR aims to support separate buffer size configuration in UnsafeShuffleWriter.

Introduce `spark.shuffle.file.merge.buffer` configuration.

### Why are the changes needed?

`UnsafeShuffleWriter#mergeSpillsWithFileStream` uses `spark.shuffle.file.buffer` as the buffer for reading spill files, and this buffer is an off-heap buffer.

In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, `UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of off-heap memory, which makes the executor easily killed by YARN.

https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment verification

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47733 from cxzl25/SPARK-49217.

Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
IvanK-db pushed a commit to IvanK-db/spark that referenced this pull request Sep 20, 2024
…yteBuffer.allocateDirect`

### What changes were proposed in this pull request?
This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

### Why are the changes needed?
apache#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…afeShuffleWriter

### What changes were proposed in this pull request?
This PR aims to support separate buffer size configuration in UnsafeShuffleWriter.

Introduce `spark.shuffle.file.merge.buffer` configuration.

### Why are the changes needed?

`UnsafeShuffleWriter#mergeSpillsWithFileStream` uses `spark.shuffle.file.buffer` as the buffer for reading spill files, and this buffer is an off-heap buffer.

In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, `UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of off-heap memory, which makes the executor easily killed by YARN.

https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment verification

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47733 from cxzl25/SPARK-49217.

Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…yteBuffer.allocateDirect`

### What changes were proposed in this pull request?
This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

### Why are the changes needed?
apache#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…afeShuffleWriter

### What changes were proposed in this pull request?
This PR aims to support separate buffer size configuration in UnsafeShuffleWriter.

Introduce `spark.shuffle.file.merge.buffer` configuration.

### Why are the changes needed?

`UnsafeShuffleWriter#mergeSpillsWithFileStream` uses `spark.shuffle.file.buffer` as the buffer for reading spill files, and this buffer is an off-heap buffer.

In the spill process, we hope that the buffer size is larger, but once there are too many files in the spill, `UnsafeShuffleWriter#mergeSpillsWithFileStream` needs to create a lot of off-heap memory, which makes the executor easily killed by YARN.

https://github.com/apache/spark/blob/e72d21c299a450e48b3cf6e5d36b8f3e9a568088/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L372-L375

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment verification

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47733 from cxzl25/SPARK-49217.

Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…yteBuffer.allocateDirect`

### What changes were proposed in this pull request?
This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

### Why are the changes needed?
apache#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…yteBuffer.allocateDirect` (apache#557)

This PR aims to use `Platform.allocateDirectBuffer` instead of `ByteBuffer.allocateDirect`.

apache#47733 (review)

Allocating off-heap memory should use the `allocateDirectBuffer` API provided `by Platform`.

No

GA

No

Closes apache#47987 from cxzl25/SPARK-49509.

Authored-by: sychen <[email protected]>

(cherry picked from commit 2ed6c3e)

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: sychen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants