Skip to content

Conversation

@liyezhang556520
Copy link
Contributor

What changes were proposed in this pull request?

When netty transfer data that is not FileRegion, data will be in format of ByteBuf, If the data is large, there will occur significant performance issue because there is memory copy underlying in sun.nio.ch.IOUtil.write, the CPU is 100% used, and network is very low.

In this PR, if data size is large, we will split it into small chunks to call WritableByteChannel.write(), so that avoid wasting of memory copy. Because the data can't be written within a single write, and it will call transferTo multiple times.

How was this patch tested?

Spark unit test and manual test.
Manual test:
sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length

For more details, please refer to SPARK-14290

* The size should not be too large as it will waste underlying memory copy. e.g. If network
* avaliable buffer is smaller than this limit, the data cannot be sent within one single write
* operation while it still will make memory copy with this size.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set this limit to 512K because in my test, it can successfully write about 600KB ~1.5MB size data for each WritableByteChannel.write(). This size need to be decided after more tests by someone else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to know the accurate number? I guess not because it's OS dependent and may be changed vis OS settings.

However, I saw Hadoop uses private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a little worried that 512k might be a bit too much. On my machine, /proc/sys/net/core/wmem_default is around 200k, which (I assume) means you'd be copying about half of the buffer with no need here.

Instead, how about using a more conservative value (like hadoop's), and loop in copyByteBuf until you either write the whole source buffer, or get a short write?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a too small value will waste a lot of system calls. Our use case is different than Hadoop. Here we may send large messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we create DirectByteBuffer here manually for a big buf (big enough so that we can get benefits even if creating a direct buffer is slow) and try to write as many as possible? Then we can avoid the memory copy in IOUtil.write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to know the accurate number? I guess not because it's OS dependent and may be changed vis OS settings.

@zsxwing There might be a way to get the accurate number of the network buffer, but I think it's meaningless to do that because even we get the accurate number, we cannot guarantee the network send buffer is empty each time we write the data, which means, it's always possible that we can only write part of the data whatever size we set NIO_BUFFER_LIMIT. We can only say the smaller the NIO_BUFFER_LIMIT is, the less redundant copy will be made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my machine, /proc/sys/net/core/wmem_default is around 200k, which (I assume) means you'd be copying about half of the buffer with no need here.

@vanzin , on my machine, both wmem_default and wmem_max are also around 200K, but in my test, I can successfully write more than 512K for each WritableByteChannel.write(), this size should be the same with return size of writeFromNativeBuffer as in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#65. I don't know why. Can you also make a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we create DirectByteBuffer here manually for a big buf (big enough so that we can get benefits even if creating a direct buffer is slow) and try to write as many as possible? Then we can avoid the memory copy in IOUtil.write.

@zsxwing , Yes, redundant copy can be avoided if we give a directBuffer directly to WritableByteChannel.write() because of code in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#50, but I don't know if that's worthwhile. IOUtil will maintain a directBuffer pool to avoid frequently allocate the directBuffers. I think that's why when I made the test, the first time I run code sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Long](1024 * 1024 * 200)).iterator).reduce((a,b)=> a).length, the network throughput is extremely low on executor side, and if I ran this code after I ran the code sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length, the network throughput will be much higher.

So, If we want create direct Buffer manually in Spark, It's better also maintain a buffer pool, but that will introduce much more complexity and have the risk of memory leak.

@liyezhang556520
Copy link
Contributor Author

cc @rxin

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54613 has finished for PR 12083 at commit 63ca85a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

@zsxwing

@vanzin
Copy link
Contributor

vanzin commented Mar 31, 2016

This is a little unexpected; I'd expect that if there isn't enough buffer space in the WritableByteChannel, you'd get a short write and that's it. The code already takes care of that by keeping track of how many bytes were written, and a quick look at the netty code shows it does the same (it does spin a few times, by default 16, calling transferTo in a loop to see if it makes progress).

Do you know where that is breaking? Are we maybe failing to set some flag somewhere that properly configures the channels as non-blocking? Or is this an issue with the underlying WritableByteChannel that ends up used (and do you know what that implementation is)?

@zsxwing
Copy link
Member

zsxwing commented Mar 31, 2016

@liyezhang556520 Could you point out the class calling IOUtil.write? The implementation I found is ByteArrayWritableChannel.write. It just calls ByteBuffer.get which doesn't use a buffer.

Never mind. I found there is another implemetation: sun.nio.ch.SocketChannelImpl

@zsxwing
Copy link
Member

zsxwing commented Mar 31, 2016

I just read the codes here: http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#46

sun.nio.ch.IOUtil.write still needs the memory copy for small chunks (non DirectByteBuffer)

@liyezhang556520
Copy link
Contributor Author

Hi @vanzin , the memory copy place is given out by @zsxwing , the call stack is as follows:

        at java.nio.Bits.copyFromArray(Bits.java:754)
        at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:371)
        at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:342)
        at sun.nio.ch.IOUtil.write(IOUtil.java:60)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
        - locked <0x00007f8a8a28d400> (a java.lang.Object)
        at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:131)
        at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:114)

The whole buffer copy is in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#60, but the buffer cannot be totally written if its size is greater than the available underlying buffer's. Which is in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#65. So each time we will make a copy of the input ByteBuf, and write only a part of it if the input size is big relatively. This results in multiply copies of the input ByteBuf that is not necessary.

The method of handling the issue in this PR is the same as that in Hadoop, please refer to https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java#L2957

@zsxwing
Copy link
Member

zsxwing commented Apr 1, 2016

@liyezhang556520 thanks for your clarifying. Now I think I understand this issue. It's because writeFromNativeBuffer doesn't guarantee writing all bytes in the copied buffer (limited by the underlying OS buffer). So if we write a 1M buffer, it can only write NIO_BUFFER_LIMIT (512K). And we need to write the rest 512K again. So in this case, we need to copy 1M + 512K bytes. If we divide 1M buffer to 2 * 512K buffers, then we only need to copy 512K + 512K bytes. Is it correct?

@vanzin
Copy link
Contributor

vanzin commented Apr 1, 2016

@liyezhang556520 ah, I see. Thanks for the pointer. So basically, if the source buffer is not a direct buffer, that class is making a copy of the whole source buffer before trying to write it to the channel. That's, uh, a little silly, but I guess it's something we have to live with...

@liyezhang556520
Copy link
Contributor Author

So if we write a 1M buffer, it can only write NIO_BUFFER_LIMIT (512K). And we need to write the reset 512K again. So in this case, we need to copy 1M + 512K bytes. If we divide 1M buffer to 2 * 512K buffers, then we only need to copy 512K + 512K bytes. Is it correct?

@zsxwing , yes that right, so there will be tremendous copies if the data to be written is huge.

So basically, if the source buffer is not a direct buffer, that class is making a copy of the whole source buffer before trying to write it to the channel. That's, uh, a little silly, but I guess it's something we have to live with...

@vanzin Yes, we have to live with it if the buffer is not a direct buffer.

@liyezhang556520
Copy link
Contributor Author

@zsxwing , @vanzin Any further comments?

@vanzin
Copy link
Contributor

vanzin commented Apr 5, 2016

@liyezhang556520 I like the idea of eargerly copying into a direct buffer, but understand that might be a lot of code for not much gain. I still think we should reduce that limit though - maybe 256k?

@liyezhang556520
Copy link
Contributor Author

@vanzin , @zsxwing , I have changed the buffer limit to 256K. I do agree that it's better we handle this issue by manually copying data to directBuffer, so no duplicate copy will be made. But that will introduce code complexity somehow. How about we merge this PR first, and in further, I can create a new JIRA and submit a new PR to use directByteBufferPool to avoid duplicate memory copy? Because once we use directBuffer, the code is supposed to be take care of and should be well reviewed.

@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55082 has finished for PR 12083 at commit a793696.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liyezhang556520
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55099 has finished for PR 12083 at commit a793696.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liyezhang556520
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55102 has finished for PR 12083 at commit a793696.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Apr 6, 2016

LGTM

@vanzin
Copy link
Contributor

vanzin commented Apr 6, 2016

Merging to master, thanks!

@asfgit asfgit closed this in c4bb02a Apr 6, 2016
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Apr 7, 2016
@davies
Copy link
Contributor

davies commented Apr 11, 2016

@liyezhang556520 Could you send a patch for 1.6 branch?

@liyezhang556520
Copy link
Contributor Author

@davies , please see #12296

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants