-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Implement byte array reusage in nio transport #27051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This still needs some more granular testing. And some settings to disable byte array reuse. But I wanted to push it up here as a POC and get some feedback and see what others think about this approach before I continue to do more work. |
|
This change has difficulties working with the test harness currently as we check breakers are at zero and pages have been released between each test. In this PR, pages can still be attached to active channels that live between tests causing assertions to fail. To work around, I disabled circuit breaking for channel buffers (as I'm not sure we would currently want that anyway). And I temporarily disabled checking pages released between tests (only afterClass). I made that change as I am interested in seeing a CI run without these failures. But that would probably not be the permanent solution. |
s1monw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass. I like where it's going. Thanks of the iterations after our chat. very helpful
| @Override | ||
| protected void ensureBufferSpace(int expectedAdditions) { | ||
| try { | ||
| super.ensureBufferSpace(expectedAdditions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh man that is terrible. We should contribute this back to hppc. Can you make sure we do this? I don't think it should ever catch OOM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh and, good catch!
|
|
||
| @Override | ||
| public void close() { | ||
| if (recycler != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use IOUtils#close
|
|
||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| public class BytesPage extends BytesArray implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move this all into BytesArray instead? I don't think we need another class. just make the recycler optional as you did and we are good?
|
|
||
| /** | ||
| * Allocate a new {@link BytesPage}. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I am starting to wrap my head around this and I think I have an idea of how we can simplify this more. We are currently using this concrete BytesPage instead of org.elasticsearch.common.util.ByteArray which is due to the lack of accessing the underlying bytes. Yet, using ByteArray might have some beauty to it, especially since it allows to have a single instance that we can simply resize by calling BigArrays#resize instead of doing all the accounting ourself. So in-order to do that I think we would need to add a couple of methods to ByteArray for instance this:
public interface ByteArray extends BigArray {
//... all the existing methods
// this sets the page for the given offset to the bytes ref without materializing it
// we can then in-turn wrap this with a ByteBuffer to read from NIO
public void getPageReference(int offset, BytesRef ref);
// releases all allocated byte pages until the given offset
// the bytes at the given offset are not accessible at offset `0`
public void shrinkTo(int offset);
// this returns a bytes reference to the given offset with the given length without releasing etc.
public BytesReference toBytesReference(int offset, int length);
}this way I think we can keep all the logic of releaseing and advancing to the ByteArray utils and we are basically just moving along with the messages. I think this would make a lot of code much simpler in here. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really opposed to modifying ByteArray to fit our needs. I even considered using it when I first worked on this PR but I did not want to do much modification to it at the time, so I held off.
I think in regards to:
public BytesReference toBytesReference(int offset, int length);
Both of those pieces of functionality should be added pretty trivially using PagedBytesReference.
I don't really think public void getPageReference(int offset, BytesRef ref) is needed as PagedBytesReference returns a BytesRef iterator that will not materialize bytes. And most nio apis support vectorized operations.
shrinkTo is more complicated as it does not look like the ByteArray was designed to be shrunk. Nor is it a ring buffer which means that every time you slice a message off the head (and then shrink) you have to rearrange the stuff in the arrays.
| @Override | ||
| public void close() { | ||
| if (closed == false) { | ||
| channelBuffer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just protected channelbuffer from having sideeffects of multiple closing and remove the closed flag? it seems unused?
| try { | ||
| headOp.flush(); | ||
| } catch (IOException e) { | ||
| headOp.getListener().onFailure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do the close in a finally if headOp.getListener().onFailure(e); barfs?
|
|
||
| if (headOp.isFullyFlushed()) { | ||
| headOp.getListener().onResponse(channel); | ||
| headOp.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
|
|
||
| try { | ||
| BytesReference messageWithoutHeader = message.slice(6, message.length() - 6); | ||
| BytesReference messageWithoutHeader = messageBytes.slice(6, messageBytes.length() - 6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move the two lines you access the message into the try block
| import java.nio.ByteBuffer; | ||
| import java.util.Iterator; | ||
|
|
||
| public class NetworkBytesReference extends BytesReference { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w00t!
| protected static void checkStaticState() throws Exception { | ||
| MockPageCacheRecycler.ensureAllPagesAreReleased(); | ||
| protected static void checkStaticState(boolean afterClass) throws Exception { | ||
| if (afterClass) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this in general or can we maybe only do this in specific test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I view this as a long term fix. I just wanted to full CI run without failures caused by holding bytes in between tests. We maybe should talk about the best way to deal with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok understood. maybe mark it with //nocommit
|
|
||
| @Override | ||
| public void close() { | ||
| // TODO: Do we want to throw an exception to expose misuse? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the closeable contract says it's closeable multple times so we are good I guess.
|
I'm working on another PR related to integrating this with our recyclers. I don't need any reviews for this PR until that work is integrated. |
|
I am closing this PR. We have refined the data structures a bit in #27551 and will eventually build something off that. |
This commit implements a manner of reusing byte arrays when reading from
a nio channel. Specifically, it adds the ability to request a single
byte page from a
BigArraysinstance. When a byte page is requested, itis added to a
ChannelBuffer. Internally the byte pages are referencecounted to handle the case where a message causes a byte page to be
sliced in half.
Introducing new concepts (
ChannelBuffer,NetworkBytesReference,NetworkBytes) was necessary as our existing implementations(
BigByteArray,PagedBytesReference, various streams) do not interfacewell with java nio.