-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame #23602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26674][CORE]Consolidate CompositeByteBuf when reading large frame #23602
Conversation
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
|
Do you have a sense of how much time the consolidation takes vs memory saved? Just trying to get a handle on what the tradeoff is here. It's probably a good change. Any other places we can do this? |
|
@srowen Thanks for suggestion, currently, It's just a thought, without much sense on the tradeoff, Seems the memory saved is decided by the readable size of socket and the memory allocation strategy(default |
|
I have done some benchmark tests on my local machine, seems it can save large memory with small time cost -- ~ 200 milis for 50% saving from a 1GB CompositeByteBuf Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Linux 4.15.0-43-generic [test consolidate 100 buffers each with 10m, 50% used for 1 loop] [test consolidate 100 buffers each with 10m, 100% used for 1 loop] [test consolidate 100 buffers each with 10m, 50% used for 10 loop] [test consolidate 100 buffers each with 10m, 100% used for 10 loop] [test consolidate 100 buffers each with 10m, 50% used for 50 loop] [test consolidate 100 buffers each with 10m, 100% used for 50 loop] [test consolidate 20 buffers each with 50m, 50% used for 1 loop] [test consolidate 20 buffers each with 50m, 100% used for 1 loop] [test consolidate 20 buffers each with 50m, 50% used for 10 loop] [test consolidate 20 buffers each with 50m, 100% used for 10 loop] [test consolidate 20 buffers each with 50m, 50% used for 50 loop] [test consolidate 20 buffers each with 50m, 100% used for 50 loop] [test consolidate 10 buffers each with 100m, 50% used for 1 loop] [test consolidate 10 buffers each with 100m, 100% used for 1 loop] [test consolidate 10 buffers each with 100m, 50% used for 10 loop] [test consolidate 10 buffers each with 100m, 100% used for 10 loop] [test consolidate 10 buffers each with 100m, 50% used for 50 loop] [test consolidate 10 buffers each with 100m, 100% used for 50 loop] |
|
The benchmark codes is as below: |
|
The only risk now is when doing consolidate, the memory will double before it's done. the default maxComponents value is 16 and if components exceeds this threshold, then consolidate would happen. so due to default socket buffer is small(usually <1M, according to net.ipv4.tcp_rmem), it's safe. |
|
Interesting, see #12038 where there is a lot of discussion about this; it looks to be pretty on purpose. @vanzin and @liyezhang556520 discussed it. |
@srowen I just run a benchmark test for the above code, and it's true that it's rather slow. the test report is as below: // --- Test Reports for plan 2 ------ |
|
But, I come up with another idea, we can just check the writerIndex of the compositebytebuf, and if the delta exceeds some threshold then we can do consolidation. How to set a reasonable threshold which take good care of both performance and memory? 1. Estimate memoryOverhead for shuffle 2. Estimate the threshold upon the shuffle memoryOverhead This is a conservative estimation, for most cases, we can make the threshold higher, but we just keep it unchanged for better safety. Then, let's say we fetch a 1GB shuffle block(memoryOverhead should be larger), then we got at least 300M as the threshold. 3. How about the performance? |
|
Seems that we can gain huge memory saving with little time spent.(at most ~ 500milis for a 1GB shuffle). This method has many advantanges:
|
| return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); | ||
| } | ||
|
|
||
| /** The threshold for consolidation, it is derived upon the memoryOverhead in yarn mode. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This replicates a lot of logic from elsewhere with hard-coded constants. Is it really important vary it so finely and add a whole new conf? It seems like this ought to be pretty independent of the environment, whether consolidating a buffer of size X is worthwhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Thanks, yes, I think you are right, we can just make it some fixed factor of the frame size.
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
|
After refine, the perf tests show that consolidation can work well for reading 1GB block with little extra memory(use consolidate(cIndex, numComponents) to avoid unnecessary consolidation for already consolidated Components). Results with my laptop at low battery. Results with my laptop with normal battery. |
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Outdated
Show resolved
Hide resolved
| // to reduce memory consumption. | ||
| if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateFrameBufsDeltaThreshold) { | ||
| int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents; | ||
| frameBuf.consolidate(consolidatedNumComponents, newNumComponents); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here seems correct, but how is this different than just calling frameBuf.consolidate() without having to keep track of the component count in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No parameter consolidate() will do unnecessary consolidate for already consolidated components (aka there are always one component after consolidation), it's slow and memory wasting, However, consolidate(cIndex, numComponents) will only consolidate specified new components.
For instance, let's say we add 10 components, and do first consolidation, then we got one consolidated component. If we use consolidate(cIndex, numComponents) here, then next time we do consolidation after another 10 components added, we do not need to consolidate the components already consolidated(no extra memory allocation and copy).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so in the end you don't end up with a single component, but with many components of size CONSOLIDATE_THRESHOLD each (minus the last one). I thought I saw the tests checking for a single component after consolidation, but may have misread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's it.
|
|
||
| // Reset buf and size for next frame. | ||
| ByteBuf frameBufCopy = frameBuf.duplicate(); | ||
| frameBuf = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To follow up Sean's question, aren't you leaking frameBuf here now? You're returning a duplicate and not releasing this instance to decrement its ref count.
(Another way to say that returning the buffer itself is probably the right thing.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, frameBuf.duplicate create a derived buffer which shares the memory region of the parent buffer. A derived buffer does not have its own reference count but shares the reference count of the parent buffer.
Here we can return a local variable refer to the frameBuf object, and null out the frameBuf for next frame decoding.
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
| private int frameRemainingBytes = UNKNOWN_FRAME_SIZE; | ||
| private volatile Interceptor interceptor; | ||
|
|
||
| public TransportFrameDecoder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though you were going to make this configurable. Where are you reading the value from the configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I think maybe we can just make it a fixed value, user will unlikely to change this threshold in most cases, and it requires little memory as shown in the newest tests reports.
|
ok to test |
|
Test build #102205 has finished for PR 23602 at commit
|
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testConsolidationForDecodingNonFullyWrittenByteBuf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this is testing that consolidation is reducing the amount of memory needed to hold a frame? But since you're writing just 1 MB to the decoder, that's not triggering consolidation, is it?
Playing with CompositeByteBuf, it adjusts the internal capacity based on the readable bytes of the components, but the component buffers remain unchanged, so still holding on to the original amount of memory:
scala> cb.numComponents()
res4: Int = 2
scala> cb.capacity()
res5: Int = 8
scala> cb.component(0).capacity()
res6: Int = 1048576
So I'm not sure this test is testing anything useful.
Also it would be nice not to use so many magic numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin I think the test should be refined. but I was quesion about your test.
CompositeByteBuf.capacity returns the last component endOffset, I think use the capacity for testing is ok.
https://github.com/netty/netty/blob/8fecbab2c56d3f49d0353d58ee1681f3e6d3feca/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java#L730
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe my question wasn't clear. I'm asking what part of Spark code is this test testing.
As far as I can see, it's testing netty code, and these are not netty unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin it think this test is a little duplicate of testConsolidationPerf, we can just remove it. I will update soon. Sorry for that.
...n/network-common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
Outdated
Show resolved
Hide resolved
| while (frameRemainingBytes > 0 && !buffers.isEmpty()) { | ||
| ByteBuf next = nextBufferForFrame(frameRemainingBytes); | ||
| frameRemainingBytes -= next.readableBytes(); | ||
| frameBuf.addComponent(next).writerIndex(frameBuf.writerIndex() + next.readableBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if that's a new call, but this can be frameBuf.addComponent(true, next)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin This is a copy of existent code. I can replace it with what you suggested.
|
Test build #102351 has finished for PR 23602 at commit
|
|
Test build #102377 has finished for PR 23602 at commit
|
| // Reset buf and size for next frame. | ||
| ByteBuf frame = frameBuf; | ||
| frameBuf = null; | ||
| nextFrameSize = UNKNOWN_FRAME_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to reset consolidatedFrameBufSize and consolidatedNumComponents back to 0 for the next frame buffer.
Otherwise after a very huge frame all the smaller but still quite huge frames are not consolidated at all.
And when consolidation starts as a frame which bigger then the maximum up to this then only the components are consolidated which are after the previous maximum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@attilapiros Good catch! Than you so much! I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@attilapiros done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you fixed this, but it should have been caught by unit tests. So there's probably a check missing in your tests (expected number of components?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not the check for the expected number of components missing but testing with multiple messages. Right now within the loop body, where a new TransportFrameDecoder is created too, there is only one 1GB message sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can add some code to test multiple messages, and we just need to do the same check for consolidated buf capacity. I think this is more result oriented.
|
Test build #102393 has finished for PR 23602 at commit
|
|
Test build #102409 has finished for PR 23602 at commit
|
| totalBytesGot += buf.capacity(); | ||
| } | ||
| assertEquals(numMessages, retained.size()); | ||
| assertEquals(targetBytes * numMessages, totalBytesGot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean this test now requires 3GB of memory just to store the data it's checking?
That seems wasteful. Either change the test to do checks after each separate message is written, or lower the size of the messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
| decoder.channelRead(ctx, buf); | ||
| while (writtenBytes < targetBytes) { | ||
| buf = Unpooled.buffer(pieceBytes * 2); | ||
| ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to point out you're counting this allocation time in your performance measurement, which isn't optimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thank you @vanzin
|
Test build #102552 has finished for PR 23602 at commit
|
|
retest this please |
|
Test build #102758 has finished for PR 23602 at commit
|
|
retest this please |
|
looks good pending tests (which failed last with an unrelated issue that should now be fixed). |
|
Test build #102760 has finished for PR 23602 at commit
|
|
Merging to master. |
What changes were proposed in this pull request?
Currently, TransportFrameDecoder will not consolidate the buffers read from network which may cause memory waste. Actually, bytebuf's writtenIndex is far less than it's capacity in most cases, so we can optimize it by doing consolidation.
This PR will do this optimization.
Related codes:
spark/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
Line 143 in 9a30e23
How was this patch tested?
UT
Please review http://spark.apache.org/contributing.html before opening a pull request.