-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-14242][CORE][Network] avoid copy in compositeBuffer for frame decoder #12038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| // Otherwise, create a composite buffer. | ||
| CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we can set the maxNumComponents for compositeBuffer to avoid consolidate underlying, such as CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);, but this might be not a good choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not a good choice? With your change, you're replacing "maybe copy multiple times" with "always copy once". If there's a way to avoid the copy altogether, why not do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin , I'm not sure why Netty underlying set a maximum number components (max size is Integer.MAX_VALUE), and the default value is only 16, this seems very small for consolidation. Will it occurs other problem when there are too many small buffers under compositeBuffer? Is that why it will consolidate when the small buffer number reaches the maxNumCompnent?
|
Test build #54441 has finished for PR 12038 at commit
|
|
Maybe we can try to use |
|
cc @vanzin |
|
Test build #54492 has finished for PR 12038 at commit
|
That's better, but is it needed at all? I don't see any comments about why consolidating the buffers is a win in the source for CompositeByteBuf. Traversing the single buffer should be slightly faster because there's less bookkeeping, but there's the cost of copying that data in the first place. When testing this code, I remember that during large transfers packets would arrive in 64k chunks at the most, so that means that once you're transferring more than 1MB, you'd have to copy things. Have you tried not consolidating to see whether there's any negative side-effect? |
If so we can just set the
In my test, the chunk sizes are mainly around 20~30 KB.
I tested previously with Ok, let's solve this issue without copy for any case. |
|
Test build #54578 has finished for PR 12038 at commit
|
|
retest this please. |
|
Test build #54580 has finished for PR 12038 at commit
|
|
@liyezhang556520 looks great but could you update the commit message to reflect the actual change? Thanks |
|
LGTM @liyezhang556520 please ping me when you update the PR description. |
|
Merging to master. Thanks, @liyezhang556520 ! |
…rame decoder apache#12038 [EXT][SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle rule apache#11438
|
cherry-picked into 1.6 branch |
|
@davies has this pr cherry-picked into branch-1.6? |
|
Yes |
|
@davies , but I didn't find this commit in branch-1.6. |
|
@davies , I didn't see the commit in branch-1.6 either, seems this commit can not be simply git cherry-pick because the file path is not the same in master and branch-1.6. Do I need to submit another PR for back-port? |
…decoder ## What changes were proposed in this pull request? In this patch, we set the initial `maxNumComponents` to `Integer.MAX_VALUE` instead of the default size ( which is 16) when allocating `compositeBuffer` in `TransportFrameDecoder` because `compositeBuffer` will introduce too many memory copies underlying if `compositeBuffer` is with default `maxNumComponents` when the frame size is large (which result in many transport messages). For details, please refer to [SPARK-14242](https://issues.apache.org/jira/browse/SPARK-14242). ## How was this patch tested? spark unit tests and manual tests. For manual tests, we can reproduce the performance issue with following code: `sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length` It's easy to see the performance gain, both from the running time and CPU usage. Author: Zhang, Liye <[email protected]> Closes #12038 from liyezhang556520/spark-14242.
|
sorry, I forgot to push. it's in branch-1.6 now. |
…decoder In this patch, we set the initial `maxNumComponents` to `Integer.MAX_VALUE` instead of the default size ( which is 16) when allocating `compositeBuffer` in `TransportFrameDecoder` because `compositeBuffer` will introduce too many memory copies underlying if `compositeBuffer` is with default `maxNumComponents` when the frame size is large (which result in many transport messages). For details, please refer to [SPARK-14242](https://issues.apache.org/jira/browse/SPARK-14242). spark unit tests and manual tests. For manual tests, we can reproduce the performance issue with following code: `sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length` It's easy to see the performance gain, both from the running time and CPU usage. Author: Zhang, Liye <[email protected]> Closes apache#12038 from liyezhang556520/spark-14242. (cherry picked from commit 663a492)
What changes were proposed in this pull request?
In this patch, we set the initial
maxNumComponentstoInteger.MAX_VALUEinstead of the default size ( which is 16) when allocatingcompositeBufferinTransportFrameDecoderbecausecompositeBufferwill introduce too many memory copies underlying ifcompositeBufferis with defaultmaxNumComponentswhen the frame size is large (which result in many transport messages). For details, please refer to SPARK-14242.How was this patch tested?
spark unit tests and manual tests.
For manual tests, we can reproduce the performance issue with following code:
sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).lengthIt's easy to see the performance gain, both from the running time and CPU usage.