Skip to content

Commit d52668d

Browse files
Simplify BytesReference StreamInput (#61681)
Flattening both streams into a single stream here saves a few objects and some indirection. Also, removed the redundant `offset` field which added nothing but complexity by forcing the incrementation of two counters on every read.
1 parent 36368b7 commit d52668d

File tree

2 files changed

+105
-183
lines changed

2 files changed

+105
-183
lines changed

server/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReference.java

Lines changed: 105 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.io.EOFException;
2727
import java.io.IOException;
28-
import java.io.InputStream;
2928
import java.io.OutputStream;
3029
import java.util.function.ToIntBiFunction;
3130

@@ -51,7 +50,7 @@ public int indexOf(byte marker, int from) {
5150

5251
@Override
5352
public StreamInput streamInput() throws IOException {
54-
return new MarkSupportingStreamInputWrapper(this);
53+
return new BytesReferenceStreamInput();
5554
}
5655

5756
@Override
@@ -181,61 +180,139 @@ private static void advance(final BytesRef ref, final int length) {
181180
ref.offset += length;
182181
}
183182

183+
@Override
184+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
185+
BytesRef bytes = toBytesRef();
186+
return builder.value(bytes.bytes, bytes.offset, bytes.length);
187+
}
188+
184189
/**
185-
* Instead of adding the complexity of {@link InputStream#reset()} etc to the actual impl
186-
* this wrapper builds it on top of the BytesReferenceStreamInput which is much simpler
187-
* that way.
190+
* A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide
191+
* generic stream access to {@link BytesReference} instances without materializing the
192+
* underlying bytes reference.
188193
*/
189-
private static final class MarkSupportingStreamInputWrapper extends StreamInput {
190-
// can't use FilterStreamInput it needs to reset the delegate
191-
private final BytesReference reference;
192-
private BytesReferenceStreamInput input;
194+
private final class BytesReferenceStreamInput extends StreamInput {
195+
196+
private BytesRefIterator iterator;
197+
private int sliceIndex;
198+
private BytesRef slice;
199+
private int sliceStartOffset; // the offset on the stream at which the current slice starts
200+
193201
private int mark = 0;
194202

195-
private MarkSupportingStreamInputWrapper(BytesReference reference) throws IOException {
196-
this.reference = reference;
197-
this.input = new BytesReferenceStreamInput(reference.iterator(), reference.length());
203+
BytesReferenceStreamInput() throws IOException {
204+
this.iterator = iterator();
205+
this.slice = iterator.next();
206+
this.sliceStartOffset = 0;
207+
this.sliceIndex = 0;
198208
}
199209

200210
@Override
201211
public byte readByte() throws IOException {
202-
return input.readByte();
212+
if (offset() >= length()) {
213+
throw new EOFException();
214+
}
215+
maybeNextSlice();
216+
return slice.bytes[slice.offset + (sliceIndex++)];
217+
}
218+
219+
private int offset() {
220+
return sliceStartOffset + sliceIndex;
221+
}
222+
223+
private void maybeNextSlice() throws IOException {
224+
while (sliceIndex == slice.length) {
225+
sliceStartOffset += sliceIndex;
226+
slice = iterator.next();
227+
sliceIndex = 0;
228+
if (slice == null) {
229+
throw new EOFException();
230+
}
231+
}
203232
}
204233

205234
@Override
206-
public void readBytes(byte[] b, int offset, int len) throws IOException {
207-
input.readBytes(b, offset, len);
235+
public void readBytes(byte[] b, int bOffset, int len) throws IOException {
236+
final int length = length();
237+
final int offset = offset();
238+
if (offset + len > length) {
239+
throw new IndexOutOfBoundsException(
240+
"Cannot read " + len + " bytes from stream with length " + length + " at offset " + offset);
241+
}
242+
read(b, bOffset, len);
208243
}
209244

210245
@Override
211-
public int read(byte[] b, int off, int len) throws IOException {
212-
return input.read(b, off, len);
246+
public int read() throws IOException {
247+
if (offset() >= length()) {
248+
return -1;
249+
}
250+
return Byte.toUnsignedInt(readByte());
213251
}
214252

215253
@Override
216-
public void close() throws IOException {
217-
input.close();
254+
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
255+
final int length = length();
256+
final int offset = offset();
257+
if (offset >= length) {
258+
return -1;
259+
}
260+
final int numBytesToCopy = Math.min(len, length - offset);
261+
int remaining = numBytesToCopy; // copy the full length or the remaining part
262+
int destOffset = bOffset;
263+
while (remaining > 0) {
264+
maybeNextSlice();
265+
final int currentLen = Math.min(remaining, slice.length - sliceIndex);
266+
assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen;
267+
System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen);
268+
destOffset += currentLen;
269+
remaining -= currentLen;
270+
sliceIndex += currentLen;
271+
assert remaining >= 0 : "remaining: " + remaining;
272+
}
273+
return numBytesToCopy;
218274
}
219275

220276
@Override
221-
public int read() throws IOException {
222-
return input.read();
277+
public void close() {
278+
// do nothing
223279
}
224280

225281
@Override
226-
public int available() throws IOException {
227-
return input.available();
282+
public int available() {
283+
return length() - offset();
228284
}
229285

230286
@Override
231-
protected void ensureCanReadBytes(int length) throws EOFException {
232-
input.ensureCanReadBytes(length);
287+
protected void ensureCanReadBytes(int bytesToRead) throws EOFException {
288+
int bytesAvailable = length() - offset();
289+
if (bytesAvailable < bytesToRead) {
290+
throw new EOFException("tried to read: " + bytesToRead + " bytes but only " + bytesAvailable + " remaining");
291+
}
292+
}
293+
294+
@Override
295+
public long skip(long n) throws IOException {
296+
final int skip = (int) Math.min(Integer.MAX_VALUE, n);
297+
final int numBytesSkipped = Math.min(skip, length() - offset());
298+
int remaining = numBytesSkipped;
299+
while (remaining > 0) {
300+
maybeNextSlice();
301+
int currentLen = Math.min(remaining, slice.length - sliceIndex);
302+
remaining -= currentLen;
303+
sliceIndex += currentLen;
304+
assert remaining >= 0 : "remaining: " + remaining;
305+
}
306+
return numBytesSkipped;
233307
}
234308

235309
@Override
236310
public void reset() throws IOException {
237-
input = new BytesReferenceStreamInput(reference.iterator(), reference.length());
238-
input.skip(mark);
311+
iterator = iterator();
312+
slice = iterator.next();
313+
sliceStartOffset = 0;
314+
sliceIndex = 0;
315+
skip(mark);
239316
}
240317

241318
@Override
@@ -247,18 +324,7 @@ public boolean markSupported() {
247324
public void mark(int readLimit) {
248325
// readLimit is optional it only guarantees that the stream remembers data upto this limit but it can remember more
249326
// which we do in our case
250-
this.mark = input.getOffset();
251-
}
252-
253-
@Override
254-
public long skip(long n) throws IOException {
255-
return input.skip(n);
327+
this.mark = offset();
256328
}
257329
}
258-
259-
@Override
260-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
261-
BytesRef bytes = toBytesRef();
262-
return builder.value(bytes.bytes, bytes.offset, bytes.length);
263-
}
264330
}

server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java

Lines changed: 0 additions & 144 deletions
This file was deleted.

0 commit comments

Comments
 (0)