-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Introduce resizable inbound byte buffer #27551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This is mostly proof of conception. I still need to add some more tests. But I wanted to see if this is more in the direction we wanted to go. @s1monw In relation to this #27221, the
|
|
@s1monw After thinking about this a little bit, I think that the work from |
|
@s1monw I went ahead and moved all of the work related to writing bytes into the WriteOperation. |
s1monw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like where this is going, I would appreciate if you could put more inline comment etc in so it would be easier to read
|
|
||
| import java.nio.ByteBuffer; | ||
|
|
||
| public class ByteBufferReference extends BytesReference { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have some javadocs?
| } | ||
| } | ||
|
|
||
| public void releasePagesFromHead(int bytesToRelease) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call it just release and always make it implicit that it's on the front?
| @Override | ||
| public BytesReference slice(int from, int length) { | ||
| if (from < 0 || (from + length) > this.length) { | ||
| throw new IllegalArgumentException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can rather throw and IndexOutOfBoundsException here?
|
|
||
| public ByteBuffer[] getPreIndexBuffers() { | ||
| if (internalIndex == 0) { | ||
| return new ByteBuffer[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assign this to a static constant?
| offset = indexInPage(bytesToRelease + offset); | ||
| } | ||
|
|
||
| public ByteBuffer[] getPreIndexBuffers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you document this, I am having trouble to understand waht that means
| private final Supplier<Page> pageSupplier; | ||
|
|
||
| private int capacity = 0; | ||
| private int internalIndex = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we rather use longs here since we can grow beyond an array an if not can we add safety guards to ensure it never does?
| return buffers; | ||
| } | ||
|
|
||
| public ByteBuffer[] getPostIndexBuffers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, I need docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be easier to have a single method that allows you to get a slice like ByteBuffer[] slice(long start, long length); then the caller can decide what is needed? this would remove the need for two methods...
| return buffers; | ||
| } | ||
|
|
||
| public void incrementIndex(int delta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it only go forward? should we enforce it is positive?
| return capacity - internalIndex; | ||
| } | ||
|
|
||
| private int numPages(int capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
up to you but maybe inline all these tiny methods? really up to you
| return index & pageMask; | ||
| } | ||
|
|
||
| public static class Page implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a preparation for the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. As a different Supplier would need access. But I'll go ahead and change it to private right now. And can adjust in whenever the "recycling" pr lands.
|
@s1monw I made some changes to address your review comments. |
s1monw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another pass but we are quite close. Looks good
| * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can | ||
| * be called and the buffer will expand using the supplier provided. | ||
| */ | ||
| public class InboundChannelBuffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be final? and pkg private?
| return buffers; | ||
| } | ||
|
|
||
| public ByteBuffer[] getPostIndexBuffers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be easier to have a single method that allows you to get a slice like ByteBuffer[] slice(long start, long length); then the caller can decide what is needed? this would remove the need for two methods...
| private int indexInPage(int index) { | ||
| return index & pageMask; | ||
| private int indexInPage(long index) { | ||
| return (int) (index & pageMask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually use Math.toIntExact() for this just to have extra protection but I think it's ok here
|
|
||
| private long capacity = 0; | ||
| private long internalIndex = 0; | ||
| private int offset = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the offset into the last buffer? can you leave a comment on it why it's an int
| if (last == null || last.hasWriteRemaining() == false) { | ||
| this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH]))); | ||
| if (channelBuffer.getRemaining() == 0) { | ||
| channelBuffer.ensureCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if InboundChannelBuffer.PAGE_SIZE should be externally visible. Usually the contract of ensureCapacity is to guarantee some kind of capacity. it's up the the impl to ensure that we overallocate with certain page sizes. I think this call should be like this channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1) and internally we can make sure we overallocate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then InboundChannelBuffer.PAGE_SIZE can be private
| } | ||
|
|
||
| int pagesToRelease = pageIndex(offset + bytesToRelease); | ||
| for (int i = 0; i < pagesToRelease; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/++i/i++
| */ | ||
| public ByteBuffer[] getPostIndexBuffers() { | ||
| if (internalIndex == capacity) { | ||
| return new ByteBuffer[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the constant?
| return (int) (index & pageMask); | ||
| } | ||
|
|
||
| private static class Page implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we maybe remove this class all-together since we don't use it and can we introduce it once it's needed?
| public static final int PAGE_SIZE = 1 << 14; | ||
| private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; | ||
|
|
||
| private final int pageMask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both pageMask and pageShift can be constants right?
| private final int pageMask; | ||
| private final int pageShift; | ||
|
|
||
| private final ArrayDeque<Page> pages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perfect application for an ArrayDeque!
|
@s1monw I have pushed changes to address your review comments. |
s1monw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM left one suggestion
| return capacity; | ||
| } | ||
|
|
||
| public long getRemaining() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a suggestion, can we assert that this is actually >= 0?
This is related to #27563. In order to interface with java nio, we must have buffers that are compatible with ByteBuffer. This commit introduces a basic ByteBufferReference to easily allow transferring bytes off the wire to usage in the application. Additionally it introduces an InboundChannelBuffer. This is a buffer that can internally expand as more space is needed. It is designed to be integrated with a page recycler so that it can internally reuse pages. The final piece is moving all of the index work for writing bytes to a channel into the WriteOperation.
This is a followup to elastic#27551. That commit introduced a bug where the incorrect byte buffers would be returned when we attempted a write. This commit fixes the logic.
This is a followup to #27551. That commit introduced a bug where the incorrect byte buffers would be returned when we attempted a write. This commit fixes the logic.
This is a followup to #27551. That commit introduced a bug where the incorrect byte buffers would be returned when we attempted a write. This commit fixes the logic.
This is related to #27563. In order to interface with java nio, we must
have buffers that are compatible with
ByteBuffer. This commit introducesa basic
ByteBufferReferenceto easily allow transferring bytes off thewire to usage in the application.
Additionally it introduces an
InboundChannelBuffer. This is a bufferthat can internally expand as more space is needed. It is designed to
be integrated with a page recycler so that it can internally reuse pages.
The final piece is moving all of the index work for writing bytes to a
channel into the
WriteOperation.