Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,34 @@
class ByteBufStreamInput extends StreamInput {

private final ByteBuf buffer;
private final int startIndex;
private final int endIndex;

ByteBufStreamInput(ByteBuf buffer, int length) {
if (length > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
this.buffer = buffer;
startIndex = buffer.readerIndex();
int startIndex = buffer.readerIndex();
endIndex = startIndex + length;
buffer.markReaderIndex();
}

@Override
public BytesReference readBytesReference(int length) throws IOException {
BytesReference ref = Netty4Utils.toBytesReference(buffer.slice(buffer.readerIndex(), length));
buffer.skipBytes(length);
return ref;
// NOTE: It is unsafe to share a reference of the internal structure, so we
// use the default implementation which will copy the bytes. It is unsafe because
// a netty ByteBuf might be pooled which requires a manual release to prevent
// memory leaks.
return super.readBytesReference(length);
}

@Override
public BytesRef readBytesRef(int length) throws IOException {
if (!buffer.hasArray()) {
return super.readBytesRef(length);
}
BytesRef bytesRef = new BytesRef(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), length);
buffer.skipBytes(length);
return bytesRef;
// NOTE: It is unsafe to share a reference of the internal structure, so we
// use the default implementation which will copy the bytes. It is unsafe because
// a netty ByteBuf might be pooled which requires a manual release to prevent
// memory leaks.
return super.readBytesRef(length);
}

@Override
Expand Down