From f469ff4bddbdff2ff5d87cedac248893e1047ec3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 27 Nov 2017 15:49:48 -0700 Subject: [PATCH 01/14] WIP --- .../common/bytes/ByteBufferReference.java | 74 ++++++++ .../bytes/ByteBufferReferenceTests.java | 47 +++++ .../transport/nio/InboundChannelBuffer.java | 166 ++++++++++++++++++ .../transport/nio/NetworkBytesReference.java | 157 ----------------- .../transport/nio/OutboundChannelBytes.java | 72 ++++++++ .../transport/nio/WriteOperation.java | 23 +-- .../nio/channel/NioSocketChannel.java | 26 ++- .../nio/channel/TcpFrameDecoder.java | 6 +- .../transport/nio/channel/TcpReadContext.java | 57 ++---- .../nio/ByteBufferReferenceTests.java | 155 ---------------- .../nio/InboundChannelBufferTests.java | 95 ++++++++++ .../nio/SocketEventHandlerTests.java | 6 +- .../transport/nio/SocketSelectorTests.java | 6 +- .../transport/nio/WriteOperationTests.java | 10 +- .../nio/channel/TcpFrameDecoderTests.java | 18 +- .../nio/channel/TcpReadContextTests.java | 26 +-- .../nio/channel/TcpWriteContextTests.java | 4 +- 17 files changed, 532 insertions(+), 416 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java create mode 100644 core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java create mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java delete mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java create mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java delete mode 100644 test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java create mode 100644 test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java new file mode 100644 index 0000000000000..d21d2bb13209e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; + +import java.nio.ByteBuffer; + +public class ByteBufferReference extends BytesReference { + + private final ByteBuffer buffer; + private final int offset; + private final int length; + + public ByteBufferReference(ByteBuffer buffer) { + this.buffer = buffer; + this.offset = buffer.position(); + this.length = buffer.remaining(); + } + + @Override + public byte get(int index) { + return buffer.get(index + offset); + } + + @Override + public int length() { + return length; + } + + @Override + public BytesReference slice(int from, int length) { + if (from < 0 || (from + length) > this.length) { + throw new IllegalArgumentException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" + + from + "], length [" + length + "]"); + } + ByteBuffer newByteBuffer = buffer.duplicate(); + newByteBuffer.position(offset + from); + newByteBuffer.limit(offset + from + length); + return new ByteBufferReference(newByteBuffer); + } + + @Override + public BytesRef toBytesRef() { + if (buffer.hasArray()) { + return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length); + } + final byte[] copy = new byte[length]; + buffer.get(copy, offset, length); + return new BytesRef(copy); + } + + @Override + public long ramBytesUsed() { + return buffer.capacity(); + } +} diff --git a/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java new file mode 100644 index 0000000000000..45fcacaf8b55b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class ByteBufferReferenceTests extends AbstractBytesReferenceTestCase { + + private void initializeBytes(byte[] bytes) { + for (int i = 0 ; i < bytes.length; ++i) { + bytes[i] = (byte) i; + } + } + + @Override + protected BytesReference newBytesReference(int length) throws IOException { + int offset = randomInt(27); + byte[] bytes = new byte[length + offset]; + initializeBytes(bytes); + return new ByteBufferReference(ByteBuffer.wrap(bytes, offset, length)); + } + + @Override + protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException { + byte[] bytes = new byte[length]; + initializeBytes(bytes); + return new ByteBufferReference(ByteBuffer.wrap(bytes)); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java new file mode 100644 index 0000000000000..aaca053275365 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.elasticsearch.common.lease.Releasable; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.function.Supplier; + +public class InboundChannelBuffer { + + public static final int PAGE_SIZE = 1 << 14; + + private final int pageMask; + private final int pageShift; + + private final ArrayDeque pages; + private final Supplier pageSupplier; + + private int capacity = 0; + private int internalIndex = 0; + private int offset = 0; + + public InboundChannelBuffer() { + this(() -> new Page(ByteBuffer.wrap(new byte[PAGE_SIZE]), () -> {})); + } + + public InboundChannelBuffer(Supplier pageSupplier) { + this.pageSupplier = pageSupplier; + this.pages = new ArrayDeque<>(); + this.pageMask = PAGE_SIZE - 1; + this.pageShift = Integer.numberOfTrailingZeros(PAGE_SIZE); + this.capacity = PAGE_SIZE * pages.size(); + expandCapacity(PAGE_SIZE); + } + + public void expandCapacity(int newCapacity) { + if (capacity < newCapacity) { + int numPages = numPages(newCapacity + offset); + int pagesToAdd = numPages - pages.size(); + for (int i = 0; i < pagesToAdd; ++i) { + pages.addLast(pageSupplier.get()); + } + capacity += pagesToAdd * PAGE_SIZE; + } + } + + public void releasePagesFromHead(int bytesToRelease) { + if (bytesToRelease > capacity) { + throw new IllegalArgumentException("Releasing more bytes than allocated."); + } + + int pagesToRelease = pageIndex(offset + bytesToRelease); + for (int i = 0; i < pagesToRelease; ++i) { + Page page = pages.removeFirst(); + page.close(); + } + capacity -= bytesToRelease; + internalIndex = Math.max(internalIndex - bytesToRelease, 0); + offset = indexInPage(bytesToRelease + offset); + } + + public ByteBuffer[] getPreIndexBuffers() { + int indexWithOffset = internalIndex + offset; + int pageCount = pageIndex(indexWithOffset); + int finalLimit = indexInPage(indexWithOffset); + if (finalLimit != 0) { + pageCount += 1; + } + + ByteBuffer[] buffers = new ByteBuffer[pageCount]; + Iterator pageIterator = pages.iterator(); + ByteBuffer firstBuffer = pageIterator.next().buffer.duplicate(); + firstBuffer.position(firstBuffer.position() + offset); + buffers[0] = firstBuffer; + for (int i = 1; i < buffers.length; ++i) { + buffers[i] = pageIterator.next().buffer.duplicate(); + } + if (finalLimit != 0) { + buffers[buffers.length - 1].limit(finalLimit); + } + + return buffers; + } + + public ByteBuffer[] getPostIndexBuffers() { + int indexWithOffset = offset + internalIndex; + int pageIndex = pageIndex(indexWithOffset); + int indexInPage = indexInPage(indexWithOffset); + + ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; + Iterator pageIterator = pages.descendingIterator(); + for (int i = buffers.length - 1; i > 0; --i) { + buffers[i] = pageIterator.next().buffer.duplicate(); + } + ByteBuffer firstPostIndexBuffer = pageIterator.next().buffer.duplicate(); + firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); + buffers[0] = firstPostIndexBuffer; + + return buffers; + } + + public void incrementIndex(int delta) { + internalIndex += delta; + } + + public int getIndex() { + return internalIndex; + } + + public int getCapacity() { + return capacity; + } + + private int numPages(int capacity) { + return (capacity + pageMask) >>> pageShift; + } + + private int pageIndex(int index) { + return index >>> pageShift; + } + + private int indexInPage(int index) { + return index & pageMask; + } + + public int getOffset() { + return offset; + } + + public static class Page implements Releasable { + + private final ByteBuffer buffer; + private final Releasable releasable; + + + public Page(ByteBuffer buffer, Releasable releasable) { + this.buffer = buffer; + this.releasable = releasable; + } + + @Override + public void close() { + releasable.close(); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java deleted file mode 100644 index cbccd7333d62b..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -public class NetworkBytesReference extends BytesReference { - - private final BytesArray bytesArray; - private final ByteBuffer writeBuffer; - private final ByteBuffer readBuffer; - - private int writeIndex; - private int readIndex; - - public NetworkBytesReference(BytesArray bytesArray, int writeIndex, int readIndex) { - this.bytesArray = bytesArray; - this.writeIndex = writeIndex; - this.readIndex = readIndex; - this.writeBuffer = ByteBuffer.wrap(bytesArray.array()); - this.readBuffer = ByteBuffer.wrap(bytesArray.array()); - } - - public static NetworkBytesReference wrap(BytesArray bytesArray) { - return wrap(bytesArray, 0, 0); - } - - public static NetworkBytesReference wrap(BytesArray bytesArray, int writeIndex, int readIndex) { - if (readIndex > writeIndex) { - throw new IndexOutOfBoundsException("Read index [" + readIndex + "] was greater than write index [" + writeIndex + "]"); - } - return new NetworkBytesReference(bytesArray, writeIndex, readIndex); - } - - @Override - public byte get(int index) { - return bytesArray.get(index); - } - - @Override - public int length() { - return bytesArray.length(); - } - - @Override - public NetworkBytesReference slice(int from, int length) { - BytesReference ref = bytesArray.slice(from, length); - BytesArray newBytesArray; - if (ref instanceof BytesArray) { - newBytesArray = (BytesArray) ref; - } else { - newBytesArray = new BytesArray(ref.toBytesRef()); - } - - int newReadIndex = Math.min(Math.max(readIndex - from, 0), length); - int newWriteIndex = Math.min(Math.max(writeIndex - from, 0), length); - - return wrap(newBytesArray, newWriteIndex, newReadIndex); - } - - @Override - public BytesRef toBytesRef() { - return bytesArray.toBytesRef(); - } - - @Override - public long ramBytesUsed() { - return bytesArray.ramBytesUsed(); - } - - public int getWriteIndex() { - return writeIndex; - } - - public void incrementWrite(int delta) { - int newWriteIndex = writeIndex + delta; - if (newWriteIndex > bytesArray.length()) { - throw new IndexOutOfBoundsException("New write index [" + newWriteIndex + "] would be greater than length" + - " [" + bytesArray.length() + "]"); - } - - writeIndex = newWriteIndex; - } - - public int getWriteRemaining() { - return bytesArray.length() - writeIndex; - } - - public boolean hasWriteRemaining() { - return getWriteRemaining() > 0; - } - - public int getReadIndex() { - return readIndex; - } - - public void incrementRead(int delta) { - int newReadIndex = readIndex + delta; - if (newReadIndex > writeIndex) { - throw new IndexOutOfBoundsException("New read index [" + newReadIndex + "] would be greater than write" + - " index [" + writeIndex + "]"); - } - readIndex = newReadIndex; - } - - public int getReadRemaining() { - return writeIndex - readIndex; - } - - public boolean hasReadRemaining() { - return getReadRemaining() > 0; - } - - public ByteBuffer getWriteByteBuffer() { - writeBuffer.position(bytesArray.offset() + writeIndex); - writeBuffer.limit(bytesArray.offset() + bytesArray.length()); - return writeBuffer; - } - - public ByteBuffer getReadByteBuffer() { - readBuffer.position(bytesArray.offset() + readIndex); - readBuffer.limit(bytesArray.offset() + writeIndex); - return readBuffer; - } - - public static void vectorizedIncrementReadIndexes(Iterable references, int delta) { - Iterator refs = references.iterator(); - while (delta != 0) { - NetworkBytesReference ref = refs.next(); - int amountToInc = Math.min(ref.getReadRemaining(), delta); - ref.incrementRead(amountToInc); - delta -= amountToInc; - } - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java new file mode 100644 index 0000000000000..98117c6494ae9 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class OutboundChannelBytes { + + private final ByteBuffer[] buffers; + private final int[] offsets; + private final int length; + private int internalIndex; + + public OutboundChannelBytes(ByteBuffer[] buffers) { + this.buffers = buffers; + this.offsets = new int[buffers.length]; + int offset = 0; + for (int i = 0; i < buffers.length; i++) { + ByteBuffer buffer = buffers[i]; + offsets[i] = offset; + offset += buffer.remaining(); + } + length = offset; + } + + public ByteBuffer[] getPostIndexBuffers() { + int offsetIndex = getOffsetIndex(internalIndex); + + ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex]; + + ByteBuffer firstBuffer = buffers[0].duplicate(); + firstBuffer.position(internalIndex - offsets[offsetIndex]); + postIndexBuffers[offsetIndex] = firstBuffer; + int j = 1; + for (int i = (offsetIndex + 1); i < buffers.length; ++i) { + postIndexBuffers[j++] = buffers[i].duplicate(); + } + + return postIndexBuffers; + } + + public void incrementIndex(int delta) { + internalIndex += delta; + } + + public int getRemaining() { + return length - internalIndex; + } + + private int getOffsetIndex(int offset) { + final int i = Arrays.binarySearch(offsets, offset); + return i < 0 ? (-(i + 1)) - 1 : i; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index f91acc5bbea3a..628bce42fc7bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -27,22 +27,23 @@ import org.elasticsearch.transport.nio.channel.NioSocketChannel; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; public class WriteOperation { private final NioSocketChannel channel; private final ActionListener listener; - private final NetworkBytesReference[] references; + private final OutboundChannelBytes outboundBytes; public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { this.channel = channel; this.listener = listener; - this.references = toArray(bytesReference); + this.outboundBytes = toChannelBytes(bytesReference); } - public NetworkBytesReference[] getByteReferences() { - return references; + public OutboundChannelBytes getByteReferences() { + return outboundBytes; } public ActionListener getListener() { @@ -54,23 +55,23 @@ public NioSocketChannel getChannel() { } public boolean isFullyFlushed() { - return references[references.length - 1].hasReadRemaining() == false; + return outboundBytes.getRemaining() == 0; } public int flush() throws IOException { - return channel.write(references); + return channel.write(outboundBytes); } - private static NetworkBytesReference[] toArray(BytesReference reference) { + private static OutboundChannelBytes toChannelBytes(BytesReference reference) { BytesRefIterator byteRefIterator = reference.iterator(); BytesRef r; try { - // Most network messages are composed of three buffers - ArrayList references = new ArrayList<>(3); + // Most network messages are composed of three buffers. + ArrayList buffers = new ArrayList<>(3); while ((r = byteRefIterator.next()) != null) { - references.add(NetworkBytesReference.wrap(new BytesArray(r), r.length, 0)); + buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length)); } - return references.toArray(new NetworkBytesReference[references.size()]); + return new OutboundChannelBytes(buffers.toArray(new ByteBuffer[buffers.size()])); } catch (IOException e) { // this is really an error since we don't do IO in our bytesreferences diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index b56731aee10b2..0fae413cd2b66 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -20,7 +20,8 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; +import org.elasticsearch.transport.nio.OutboundChannelBytes; import org.elasticsearch.transport.nio.SocketSelector; import java.io.IOException; @@ -28,7 +29,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -66,34 +66,32 @@ public SocketSelector getSelector() { return socketSelector; } - public int write(NetworkBytesReference[] references) throws IOException { + public int write(OutboundChannelBytes channelBytes) throws IOException { + ByteBuffer[] writeBuffers = channelBytes.getPostIndexBuffers(); int written; - if (references.length == 1) { - written = socketChannel.write(references[0].getReadByteBuffer()); + if (writeBuffers.length == 1) { + written = socketChannel.write(writeBuffers[0]); } else { - ByteBuffer[] buffers = new ByteBuffer[references.length]; - for (int i = 0; i < references.length; ++i) { - buffers[i] = references[i].getReadByteBuffer(); - } - written = (int) socketChannel.write(buffers); + written = (int) socketChannel.write(writeBuffers); } + if (written <= 0) { return written; } - NetworkBytesReference.vectorizedIncrementReadIndexes(Arrays.asList(references), written); + channelBytes.incrementIndex(written); return written; } - public int read(NetworkBytesReference reference) throws IOException { - int bytesRead = socketChannel.read(reference.getWriteByteBuffer()); + public int read(InboundChannelBuffer buffer) throws IOException { + int bytesRead = (int) socketChannel.read(buffer.getPostIndexBuffers()); if (bytesRead == -1) { return bytesRead; } - reference.incrementWrite(bytesRead); + buffer.incrementIndex(bytesRead); return bytesRead; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java index 356af44c5ba88..b2ba70fb2365d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java @@ -36,11 +36,11 @@ public class TcpFrameDecoder { private int expectedMessageLength = -1; - public BytesReference decode(BytesReference bytesReference, int currentBufferSize) throws IOException { - if (currentBufferSize >= 6) { + public BytesReference decode(BytesReference bytesReference) throws IOException { + if (bytesReference.length() >= 6) { int messageLength = readHeaderBuffer(bytesReference); int totalLength = messageLength + HEADER_SIZE; - if (totalLength > currentBufferSize) { + if (totalLength > bytesReference.length()) { expectedMessageLength = totalLength; return null; } else if (totalLength == bytesReference.length()) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 8eeb32a976cac..00a6a0b501699 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -19,25 +19,21 @@ package org.elasticsearch.transport.nio.channel; -import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; +import java.nio.ByteBuffer; public class TcpReadContext implements ReadContext { - private static final int DEFAULT_READ_LENGTH = 1 << 14; - private final TcpReadHandler handler; private final TcpNioSocketChannel channel; private final TcpFrameDecoder frameDecoder; - private final LinkedList references = new LinkedList<>(); - private int rawBytesCount = 0; + private final InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler) { this((TcpNioSocketChannel) channel, handler, new TcpFrameDecoder()); @@ -47,33 +43,26 @@ public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFr this.handler = handler; this.channel = channel; this.frameDecoder = frameDecoder; - this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH]))); } @Override public int read() throws IOException { - NetworkBytesReference last = references.peekLast(); - if (last == null || last.hasWriteRemaining() == false) { - this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH]))); + if ((channelBuffer.getCapacity() - channelBuffer.getIndex()) == 0) { + channelBuffer.expandCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); } - int bytesRead = channel.read(references.getLast()); + int bytesRead = channel.read(channelBuffer); if (bytesRead == -1) { return bytesRead; } - rawBytesCount += bytesRead; - BytesReference message; // Frame decoder will throw an exception if the message is improperly formatted, the header is incorrect, // or the message is corrupted - while ((message = frameDecoder.decode(createCompositeBuffer(), rawBytesCount)) != null) { + while ((message = frameDecoder.decode(toBytesReference(channelBuffer))) != null) { int messageLengthWithHeader = message.length(); - NetworkBytesReference.vectorizedIncrementReadIndexes(references, messageLengthWithHeader); - trimDecodedMessages(messageLengthWithHeader); - rawBytesCount -= messageLengthWithHeader; try { BytesReference messageWithoutHeader = message.slice(6, message.length() - 6); @@ -84,32 +73,22 @@ public int read() throws IOException { } } catch (Exception e) { handler.handleException(channel, e); + } finally { + channelBuffer.releasePagesFromHead(messageLengthWithHeader); } } return bytesRead; } - private CompositeBytesReference createCompositeBuffer() { - return new CompositeBytesReference(references.toArray(new BytesReference[references.size()])); - } - - private void trimDecodedMessages(int bytesToTrim) { - while (bytesToTrim != 0) { - NetworkBytesReference ref = references.getFirst(); - int readIndex = ref.getReadIndex(); - bytesToTrim -= readIndex; - if (readIndex == ref.length()) { - references.removeFirst(); - } else { - assert bytesToTrim == 0; - if (readIndex != 0) { - references.removeFirst(); - NetworkBytesReference slicedRef = ref.slice(readIndex, ref.length() - readIndex); - references.addFirst(slicedRef); - } - } - + private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) { + ByteBuffer[] writtenToBuffers = channelBuffer.getPreIndexBuffers(); + ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length]; + for (int i = 0; i listener; - private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1])); + private BytesReference bufferReference = new BytesArray(new byte[1]); private Selector rawSelector; @Before @@ -294,8 +295,7 @@ public void testCleanup() throws Exception { socketSelector.preSelect(); - NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(new BytesArray(new byte[1])); - socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener)); + socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), new BytesArray(new byte[1]), listener)); socketSelector.scheduleForRegistration(unRegisteredChannel); TestSelectionKey testSelectionKey = new TestSelectionKey(0); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java index 351ac87eb561e..51d54a440db24 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java @@ -49,8 +49,8 @@ public void testFlush() throws IOException { when(channel.write(any())).thenAnswer(invocationOnMock -> { - NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0]; - refs[0].incrementRead(10); + OutboundChannelBytes refs = (OutboundChannelBytes) invocationOnMock.getArguments()[0]; + refs.incrementIndex(10); return 10; }); @@ -63,14 +63,14 @@ public void testPartialFlush() throws IOException { WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); when(channel.write(any())).thenAnswer(invocationOnMock -> { - NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0]; - refs[0].incrementRead(5); + OutboundChannelBytes refs = (OutboundChannelBytes) invocationOnMock.getArguments()[0]; + refs.incrementIndex(5); return 5; }); writeOp.flush(); assertFalse(writeOp.isFullyFlushed()); - assertEquals(5, writeOp.getByteReferences()[0].getReadRemaining()); + assertEquals(5, writeOp.getByteReferences().getRemaining()); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java index 519828592be08..450016b1dc3b8 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java @@ -43,10 +43,8 @@ public void testDecodeWithIncompleteHeader() throws IOException { streamOutput.write('S'); streamOutput.write(1); streamOutput.write(1); - streamOutput.write(0); - streamOutput.write(0); - assertNull(frameDecoder.decode(streamOutput.bytes(), 4)); + assertNull(frameDecoder.decode(streamOutput.bytes())); assertEquals(-1, frameDecoder.expectedMessageLength()); } @@ -56,7 +54,7 @@ public void testDecodePing() throws IOException { streamOutput.write('S'); streamOutput.writeInt(-1); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 6); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(-1, frameDecoder.expectedMessageLength()); assertEquals(streamOutput.bytes(), message); @@ -70,7 +68,7 @@ public void testDecodePingWithStartOfSecondMessage() throws IOException { streamOutput.write('E'); streamOutput.write('S'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(6, message.length()); assertEquals(streamOutput.bytes().slice(0, 6), message); @@ -84,7 +82,7 @@ public void testDecodeMessage() throws IOException { streamOutput.write('M'); streamOutput.write('A'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(-1, frameDecoder.expectedMessageLength()); assertEquals(streamOutput.bytes(), message); @@ -98,7 +96,7 @@ public void testDecodeIncompleteMessage() throws IOException { streamOutput.write('M'); streamOutput.write('A'); - BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8); + BytesReference message = frameDecoder.decode(streamOutput.bytes()); assertEquals(9, frameDecoder.expectedMessageLength()); assertNull(message); @@ -113,7 +111,7 @@ public void testInvalidLength() throws IOException { streamOutput.write('A'); try { - frameDecoder.decode(streamOutput.bytes(), 8); + frameDecoder.decode(streamOutput.bytes()); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(StreamCorruptedException.class)); @@ -134,7 +132,7 @@ public void testInvalidHeader() throws IOException { streamOutput.write(randomByte()); try { - frameDecoder.decode(streamOutput.bytes(), 7); + frameDecoder.decode(streamOutput.bytes()); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(StreamCorruptedException.class)); @@ -158,7 +156,7 @@ public void testHTTPHeader() throws IOException { try { BytesReference bytes = streamOutput.bytes(); - frameDecoder.decode(bytes, bytes.length()); + frameDecoder.decode(bytes); fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(TcpTransport.HttpOnTransportException.class)); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 7586b5abd91e0..dc8f51ece0c2b 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.NetworkBytesReference; +import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.Before; @@ -58,12 +58,12 @@ public void testSuccessfulRead() throws IOException { byte[] fullMessage = combineMessageAndHeader(bytes); final AtomicInteger bufferCapacity = new AtomicInteger(); - when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> { - NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0]; - ByteBuffer buffer = reference.getWriteByteBuffer(); - bufferCapacity.set(reference.getWriteRemaining()); - buffer.put(fullMessage); - reference.incrementWrite(fullMessage.length); + when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { + InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; + ByteBuffer byteBuffer = buffer.getPostIndexBuffers()[0]; + bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); + byteBuffer.put(fullMessage); + buffer.incrementIndex(fullMessage.length); return fullMessage.length; }); @@ -85,12 +85,12 @@ public void testPartialRead() throws IOException { final AtomicInteger bufferCapacity = new AtomicInteger(); final AtomicReference bytes = new AtomicReference<>(); - when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> { - NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0]; - ByteBuffer buffer = reference.getWriteByteBuffer(); - bufferCapacity.set(reference.getWriteRemaining()); - buffer.put(bytes.get()); - reference.incrementWrite(bytes.get().length); + when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { + InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; + ByteBuffer byteBuffer = buffer.getPostIndexBuffers()[0]; + bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); + byteBuffer.put(bytes.get()); + buffer.incrementIndex(bytes.get().length); return bytes.get().length; }); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index 7e6410b6c6164..2492bf4b5da76 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -80,7 +80,7 @@ public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exce assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer()); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences().getPostIndexBuffers()[0]); } public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { @@ -97,7 +97,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer()); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences().getPostIndexBuffers()[0]); } public void testWriteIsQueuedInChannel() throws Exception { From da086d34d641fc99b0825fa172c3c0aa46af1eeb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 27 Nov 2017 16:05:44 -0700 Subject: [PATCH 02/14] Fix test --- .../elasticsearch/common/bytes/ByteBufferReferenceTests.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java index 45fcacaf8b55b..9560fd4003822 100644 --- a/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java +++ b/core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java @@ -32,10 +32,7 @@ private void initializeBytes(byte[] bytes) { @Override protected BytesReference newBytesReference(int length) throws IOException { - int offset = randomInt(27); - byte[] bytes = new byte[length + offset]; - initializeBytes(bytes); - return new ByteBufferReference(ByteBuffer.wrap(bytes, offset, length)); + return newBytesReferenceWithOffsetOfZero(length); } @Override From d3d9f7ad48a5440ad4c1e61390c0d82728573fe1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 27 Nov 2017 16:19:41 -0700 Subject: [PATCH 03/14] Remove method --- .../org/elasticsearch/transport/nio/InboundChannelBuffer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index aaca053275365..18eb480982af9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -143,10 +143,6 @@ private int indexInPage(int index) { return index & pageMask; } - public int getOffset() { - return offset; - } - public static class Page implements Releasable { private final ByteBuffer buffer; From fc626aa6397bf408669c9f135154ee070435063c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 27 Nov 2017 18:11:22 -0700 Subject: [PATCH 04/14] Add edge cases --- .../elasticsearch/transport/nio/InboundChannelBuffer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 18eb480982af9..9d992502ffec9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -80,6 +80,9 @@ public void releasePagesFromHead(int bytesToRelease) { } public ByteBuffer[] getPreIndexBuffers() { + if (internalIndex == 0) { + return new ByteBuffer[0]; + } int indexWithOffset = internalIndex + offset; int pageCount = pageIndex(indexWithOffset); int finalLimit = indexInPage(indexWithOffset); @@ -103,6 +106,9 @@ public ByteBuffer[] getPreIndexBuffers() { } public ByteBuffer[] getPostIndexBuffers() { + if (internalIndex + offset == capacity) { + return new ByteBuffer[0]; + } int indexWithOffset = offset + internalIndex; int pageIndex = pageIndex(indexWithOffset); int indexInPage = indexInPage(indexWithOffset); From c5f7bc3bfe72da3a4e7e81e80e8c13b40cb95f70 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 27 Nov 2017 19:26:23 -0700 Subject: [PATCH 05/14] Fix issue --- .../org/elasticsearch/transport/nio/InboundChannelBuffer.java | 3 ++- .../elasticsearch/transport/nio/channel/TcpReadContext.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 9d992502ffec9..e2ca5ca736d7e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -106,10 +106,11 @@ public ByteBuffer[] getPreIndexBuffers() { } public ByteBuffer[] getPostIndexBuffers() { - if (internalIndex + offset == capacity) { + if (internalIndex == capacity) { return new ByteBuffer[0]; } int indexWithOffset = offset + internalIndex; + int pageIndex = pageIndex(indexWithOffset); int indexInPage = indexInPage(indexWithOffset); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 00a6a0b501699..1585aaa92332a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -84,7 +84,7 @@ public int read() throws IOException { private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) { ByteBuffer[] writtenToBuffers = channelBuffer.getPreIndexBuffers(); ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length]; - for (int i = 0; i Date: Tue, 28 Nov 2017 10:48:27 -0700 Subject: [PATCH 06/14] Remove outbound bytes --- .../transport/nio/OutboundChannelBytes.java | 72 ------------------- .../transport/nio/WriteOperation.java | 53 +++++++++++--- .../nio/channel/NioSocketChannel.java | 19 ++--- .../transport/nio/WriteOperationTests.java | 14 +--- .../nio/channel/TcpWriteContextTests.java | 4 +- 5 files changed, 53 insertions(+), 109 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java deleted file mode 100644 index 98117c6494ae9..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/OutboundChannelBytes.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -public class OutboundChannelBytes { - - private final ByteBuffer[] buffers; - private final int[] offsets; - private final int length; - private int internalIndex; - - public OutboundChannelBytes(ByteBuffer[] buffers) { - this.buffers = buffers; - this.offsets = new int[buffers.length]; - int offset = 0; - for (int i = 0; i < buffers.length; i++) { - ByteBuffer buffer = buffers[i]; - offsets[i] = offset; - offset += buffer.remaining(); - } - length = offset; - } - - public ByteBuffer[] getPostIndexBuffers() { - int offsetIndex = getOffsetIndex(internalIndex); - - ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex]; - - ByteBuffer firstBuffer = buffers[0].duplicate(); - firstBuffer.position(internalIndex - offsets[offsetIndex]); - postIndexBuffers[offsetIndex] = firstBuffer; - int j = 1; - for (int i = (offsetIndex + 1); i < buffers.length; ++i) { - postIndexBuffers[j++] = buffers[i].duplicate(); - } - - return postIndexBuffers; - } - - public void incrementIndex(int delta) { - internalIndex += delta; - } - - public int getRemaining() { - return length - internalIndex; - } - - private int getOffsetIndex(int offset) { - final int i = Arrays.binarySearch(offsets, offset); - return i < 0 ? (-(i + 1)) - 1 : i; - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index 628bce42fc7bf..b20a99a286532 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -29,21 +29,33 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; public class WriteOperation { private final NioSocketChannel channel; private final ActionListener listener; - private final OutboundChannelBytes outboundBytes; + private final ByteBuffer[] buffers; + private final int[] offsets; + private final int length; + private int internalIndex; public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { this.channel = channel; this.listener = listener; - this.outboundBytes = toChannelBytes(bytesReference); + this.buffers = toByteBuffers(bytesReference); + this.offsets = new int[buffers.length]; + int offset = 0; + for (int i = 0; i < buffers.length; i++) { + ByteBuffer buffer = buffers[i]; + offsets[i] = offset; + offset += buffer.remaining(); + } + length = offset; } - public OutboundChannelBytes getByteReferences() { - return outboundBytes; + public ByteBuffer[] getByteReferences() { + return buffers; } public ActionListener getListener() { @@ -55,15 +67,38 @@ public NioSocketChannel getChannel() { } public boolean isFullyFlushed() { - return outboundBytes.getRemaining() == 0; + return internalIndex == length; } public int flush() throws IOException { - return channel.write(outboundBytes); + int written = channel.write(getBuffersToWrite()); + internalIndex += written; + return written; + } + + private ByteBuffer[] getBuffersToWrite() { + int offsetIndex = getOffsetIndex(internalIndex); + + ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex]; + + ByteBuffer firstBuffer = buffers[0].duplicate(); + firstBuffer.position(internalIndex - offsets[offsetIndex]); + postIndexBuffers[offsetIndex] = firstBuffer; + int j = 1; + for (int i = (offsetIndex + 1); i < buffers.length; ++i) { + postIndexBuffers[j++] = buffers[i].duplicate(); + } + + return postIndexBuffers; + } + + private int getOffsetIndex(int offset) { + final int i = Arrays.binarySearch(offsets, offset); + return i < 0 ? (-(i + 1)) - 1 : i; } - private static OutboundChannelBytes toChannelBytes(BytesReference reference) { - BytesRefIterator byteRefIterator = reference.iterator(); + private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) { + BytesRefIterator byteRefIterator = bytesReference.iterator(); BytesRef r; try { // Most network messages are composed of three buffers. @@ -71,7 +106,7 @@ private static OutboundChannelBytes toChannelBytes(BytesReference reference) { while ((r = byteRefIterator.next()) != null) { buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length)); } - return new OutboundChannelBytes(buffers.toArray(new ByteBuffer[buffers.size()])); + return buffers.toArray(new ByteBuffer[buffers.size()]); } catch (IOException e) { // this is really an error since we don't do IO in our bytesreferences diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index 0fae413cd2b66..2ea0c2bae4ecc 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.transport.nio.InboundChannelBuffer; -import org.elasticsearch.transport.nio.OutboundChannelBytes; import org.elasticsearch.transport.nio.SocketSelector; import java.io.IOException; @@ -66,22 +65,12 @@ public SocketSelector getSelector() { return socketSelector; } - public int write(OutboundChannelBytes channelBytes) throws IOException { - ByteBuffer[] writeBuffers = channelBytes.getPostIndexBuffers(); - int written; - if (writeBuffers.length == 1) { - written = socketChannel.write(writeBuffers[0]); + public int write(ByteBuffer[] buffers) throws IOException { + if (buffers.length == 1) { + return socketChannel.write(buffers[0]); } else { - written = (int) socketChannel.write(writeBuffers); + return (int) socketChannel.write(buffers); } - - if (written <= 0) { - return written; - } - - channelBytes.incrementIndex(written); - - return written; } public int read(InboundChannelBuffer buffer) throws IOException { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java index 51d54a440db24..0015d39a37366 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java @@ -26,6 +26,7 @@ import org.junit.Before; import java.io.IOException; +import java.nio.ByteBuffer; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -48,11 +49,7 @@ public void testFlush() throws IOException { WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); - when(channel.write(any())).thenAnswer(invocationOnMock -> { - OutboundChannelBytes refs = (OutboundChannelBytes) invocationOnMock.getArguments()[0]; - refs.incrementIndex(10); - return 10; - }); + when(channel.write(any(ByteBuffer[].class))).thenReturn(10); writeOp.flush(); @@ -62,15 +59,10 @@ public void testFlush() throws IOException { public void testPartialFlush() throws IOException { WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); - when(channel.write(any())).thenAnswer(invocationOnMock -> { - OutboundChannelBytes refs = (OutboundChannelBytes) invocationOnMock.getArguments()[0]; - refs.incrementIndex(5); - return 5; - }); + when(channel.write(any(ByteBuffer[].class))).thenReturn(5); writeOp.flush(); assertFalse(writeOp.isFullyFlushed()); - assertEquals(5, writeOp.getByteReferences().getRemaining()); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index 2492bf4b5da76..a167de9e5bdd4 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -80,7 +80,7 @@ public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exce assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences().getPostIndexBuffers()[0]); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0]); } public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { @@ -97,7 +97,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences().getPostIndexBuffers()[0]); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0]); } public void testWriteIsQueuedInChannel() throws Exception { From 5131f3b75c9a5a22a435813cf9b727ba1f5105d8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 28 Nov 2017 10:50:46 -0700 Subject: [PATCH 07/14] Rename method --- .../java/org/elasticsearch/transport/nio/WriteOperation.java | 2 +- .../transport/nio/channel/TcpWriteContextTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index b20a99a286532..0abb6a6765046 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -54,7 +54,7 @@ public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, A length = offset; } - public ByteBuffer[] getByteReferences() { + public ByteBuffer[] getByteBuffers() { return buffers; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index a167de9e5bdd4..8d7a33649e4dc 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -80,7 +80,7 @@ public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exce assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0]); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]); } public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { @@ -97,7 +97,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { assertSame(listener, writeOp.getListener()); assertSame(channel, writeOp.getChannel()); - assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0]); + assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]); } public void testWriteIsQueuedInChannel() throws Exception { From 5ff7bf5931c10222b74dffc737a9f5726e866ab2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 28 Nov 2017 15:27:59 -0700 Subject: [PATCH 08/14] Cleanup channel buffer and add tests --- .../transport/nio/InboundChannelBuffer.java | 14 ++- .../transport/nio/channel/TcpReadContext.java | 2 +- .../nio/InboundChannelBufferTests.java | 112 +++++++++++++----- 3 files changed, 96 insertions(+), 32 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index e2ca5ca736d7e..86b7a88a842cb 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -29,7 +29,6 @@ public class InboundChannelBuffer { public static final int PAGE_SIZE = 1 << 14; - private final int pageMask; private final int pageShift; @@ -66,7 +65,7 @@ public void expandCapacity(int newCapacity) { public void releasePagesFromHead(int bytesToRelease) { if (bytesToRelease > capacity) { - throw new IllegalArgumentException("Releasing more bytes than allocated."); + throw new IllegalArgumentException("Releasing more bytes [" + bytesToRelease + "] than buffer capacity [" + capacity + "]."); } int pagesToRelease = pageIndex(offset + bytesToRelease); @@ -127,7 +126,12 @@ public ByteBuffer[] getPostIndexBuffers() { } public void incrementIndex(int delta) { - internalIndex += delta; + int newIndex = delta + internalIndex; + if (newIndex > capacity) { + throw new IllegalArgumentException("Cannot increment an index [" + internalIndex + "] with a delta [" + delta + + "] that will result in a new index [" + newIndex + "] that is greater than the capacity [" + capacity + "]."); + } + internalIndex = newIndex; } public int getIndex() { @@ -138,6 +142,10 @@ public int getCapacity() { return capacity; } + public int getRemaining() { + return capacity - internalIndex; + } + private int numPages(int capacity) { return (capacity + pageMask) >>> pageShift; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 1585aaa92332a..4a7f9dc67f96b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -47,7 +47,7 @@ public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFr @Override public int read() throws IOException { - if ((channelBuffer.getCapacity() - channelBuffer.getIndex()) == 0) { + if (channelBuffer.getRemaining() == 0) { channelBuffer.expandCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java index 2a4982c4c8b70..bdf80fb9cb70d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java @@ -21,75 +21,131 @@ import org.elasticsearch.test.ESTestCase; +import java.nio.ByteBuffer; + public class InboundChannelBufferTests extends ESTestCase { private static final int PAGE_SIZE = 1 << 14; public void testNewBufferHasSinglePage() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); - assertEquals(PAGE_SIZE, inboundBuffer.getCapacity()); - assertEquals(0, inboundBuffer.getIndex()); + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); + assertEquals(0, channelBuffer.getIndex()); } public void testExpandCapacity() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); - assertEquals(PAGE_SIZE, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); - inboundBuffer.expandCapacity(PAGE_SIZE + 1); + channelBuffer.expandCapacity(PAGE_SIZE + 1); - assertEquals(PAGE_SIZE * 2, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2, channelBuffer.getRemaining()); } public void testExpandCapacityMultiplePages() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); - assertEquals(PAGE_SIZE, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); int multiple = randomInt(80); - inboundBuffer.expandCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500))); + channelBuffer.expandCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500))); - assertEquals(PAGE_SIZE * (multiple + 1), inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getRemaining()); } public void testExpandCapacityRespectsOffset() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); - assertEquals(PAGE_SIZE, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); int offset = randomInt(300); - inboundBuffer.releasePagesFromHead(offset); + channelBuffer.releasePagesFromHead(offset); - assertEquals(PAGE_SIZE - offset, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE - offset, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE - offset, channelBuffer.getRemaining()); - inboundBuffer.expandCapacity(PAGE_SIZE + 1); + channelBuffer.expandCapacity(PAGE_SIZE + 1); - assertEquals(PAGE_SIZE * 2 - offset, inboundBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getCapacity()); + assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getRemaining()); } public void testIncrementIndex() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); - assertEquals(0, inboundBuffer.getIndex()); + assertEquals(0, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); - inboundBuffer.incrementIndex(10); + channelBuffer.incrementIndex(10); - assertEquals(10, inboundBuffer.getIndex()); + assertEquals(10, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining()); } public void testIncrementIndexWithOffset() { - InboundChannelBuffer inboundBuffer = new InboundChannelBuffer(); + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + assertEquals(0, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); - assertEquals(0, inboundBuffer.getIndex()); + channelBuffer.releasePagesFromHead(10); + assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining()); - inboundBuffer.releasePagesFromHead(10); - inboundBuffer.incrementIndex(10); + channelBuffer.incrementIndex(10); - assertEquals(10, inboundBuffer.getIndex()); + assertEquals(10, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); + + channelBuffer.releasePagesFromHead(2); + assertEquals(8, channelBuffer.getIndex()); + assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); + } - inboundBuffer.releasePagesFromHead(2); - assertEquals(8, inboundBuffer.getIndex()); + public void testAccessByteBuffers() { + InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); + + int pages = randomInt(50) + 5; + channelBuffer.expandCapacity(pages * PAGE_SIZE); + + int capacity = channelBuffer.getCapacity(); + + ByteBuffer[] postIndexBuffers = channelBuffer.getPostIndexBuffers(); + int i = 0; + for (ByteBuffer buffer : postIndexBuffers) { + while (buffer.hasRemaining()) { + buffer.put((byte) (i++ % 127)); + } + } + + int indexIncremented = 0; + int bytesReleased = 0; + while (indexIncremented < capacity) { + assertEquals(indexIncremented - bytesReleased, channelBuffer.getIndex()); + + int amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining()); + assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), channelBuffer.getPostIndexBuffers()[0].get()); + ByteBuffer[] preIndexBuffers = channelBuffer.getPreIndexBuffers(); + if (preIndexBuffers.length > 0) { + ByteBuffer preIndexBuffer = preIndexBuffers[preIndexBuffers.length - 1]; + assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased - 1) % 127), preIndexBuffer.get(preIndexBuffer.limit() - 1)); + } + if (randomBoolean()) { + int bytesToRelease = Math.min(randomInt(50), channelBuffer.getIndex()); + channelBuffer.releasePagesFromHead(bytesToRelease); + bytesReleased += bytesToRelease; + } + channelBuffer.incrementIndex(amountToInc); + indexIncremented += amountToInc; + } + + assertEquals(0, channelBuffer.getPostIndexBuffers().length); } } From 63dcf73a78052e2eef33f6f2aaf12ade36c7d5d7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 29 Nov 2017 16:32:25 -0700 Subject: [PATCH 09/14] Adjust for review --- .../common/bytes/ByteBufferReference.java | 17 +++- .../transport/nio/InboundChannelBuffer.java | 79 +++++++++++++------ .../transport/nio/channel/TcpReadContext.java | 4 +- .../nio/InboundChannelBufferTests.java | 22 +++--- .../nio/channel/TcpReadContextTests.java | 5 +- 5 files changed, 87 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java index d21d2bb13209e..fbdcdfd68853d 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java @@ -23,6 +23,15 @@ import java.nio.ByteBuffer; +/** + * This is a {@link BytesReference} backed by a {@link ByteBuffer}. The byte buffer can either be a heap or + * direct byte buffer. The reference is composed of the space between the {@link ByteBuffer#position} and + * {@link ByteBuffer#limit} at construction time. If the position or limit of the underlying byte buffer is + * changed, those changes will not be reflected in this reference. However, modifying the limit or position + * of the underlying byte buffer is not recommended as those can be used during {@link ByteBuffer#get()} + * bounds checks. Use {@link ByteBuffer#duplicate()} at creation time if you plan on modifying the markers of + * the underlying byte buffer. Any changes to the underlying data in the byte buffer will be reflected. + */ public class ByteBufferReference extends BytesReference { private final ByteBuffer buffer; @@ -48,7 +57,7 @@ public int length() { @Override public BytesReference slice(int from, int length) { if (from < 0 || (from + length) > this.length) { - throw new IllegalArgumentException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" + throw new IndexOutOfBoundsException("can't slice a buffer with length [" + this.length + "], with slice parameters from [" + from + "], length [" + length + "]"); } ByteBuffer newByteBuffer = buffer.duplicate(); @@ -57,6 +66,12 @@ public BytesReference slice(int from, int length) { return new ByteBufferReference(newByteBuffer); } + /** + * This will return a bytes ref composed of the bytes. If this is a direct byte buffer, the bytes will + * have to be copied. + * + * @return the bytes ref + */ @Override public BytesRef toBytesRef() { if (buffer.hasArray()) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 86b7a88a842cb..52e3fec999b33 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -29,32 +29,34 @@ public class InboundChannelBuffer { public static final int PAGE_SIZE = 1 << 14; + private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; + private final int pageMask; private final int pageShift; private final ArrayDeque pages; private final Supplier pageSupplier; - private int capacity = 0; - private int internalIndex = 0; + private long capacity = 0; + private long internalIndex = 0; private int offset = 0; public InboundChannelBuffer() { this(() -> new Page(ByteBuffer.wrap(new byte[PAGE_SIZE]), () -> {})); } - public InboundChannelBuffer(Supplier pageSupplier) { + private InboundChannelBuffer(Supplier pageSupplier) { this.pageSupplier = pageSupplier; this.pages = new ArrayDeque<>(); this.pageMask = PAGE_SIZE - 1; this.pageShift = Integer.numberOfTrailingZeros(PAGE_SIZE); this.capacity = PAGE_SIZE * pages.size(); - expandCapacity(PAGE_SIZE); + ensureCapacity(PAGE_SIZE); } - public void expandCapacity(int newCapacity) { - if (capacity < newCapacity) { - int numPages = numPages(newCapacity + offset); + public void ensureCapacity(long requiredCapacity) { + if (capacity < requiredCapacity) { + int numPages = numPages(requiredCapacity + offset); int pagesToAdd = numPages - pages.size(); for (int i = 0; i < pagesToAdd; ++i) { pages.addLast(pageSupplier.get()); @@ -63,7 +65,12 @@ public void expandCapacity(int newCapacity) { } } - public void releasePagesFromHead(int bytesToRelease) { + /** + * This method will release bytes from the head of this buffer. + * + * @param bytesToRelease number of bytes to drop + */ + public void release(long bytesToRelease) { if (bytesToRelease > capacity) { throw new IllegalArgumentException("Releasing more bytes [" + bytesToRelease + "] than buffer capacity [" + capacity + "]."); } @@ -78,11 +85,19 @@ public void releasePagesFromHead(int bytesToRelease) { offset = indexInPage(bytesToRelease + offset); } + /** + * This method will return an array of {@link ByteBuffer} representing the bytes from the beginning of + * this buffer up through the current index. The buffers will be duplicates of the internal buffers, so + * any modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will + * not modify the this class. + * + * @return the byte buffers + */ public ByteBuffer[] getPreIndexBuffers() { if (internalIndex == 0) { - return new ByteBuffer[0]; + return EMPTY_BYTE_BUFFER_ARRAY; } - int indexWithOffset = internalIndex + offset; + long indexWithOffset = internalIndex + offset; int pageCount = pageIndex(indexWithOffset); int finalLimit = indexInPage(indexWithOffset); if (finalLimit != 0) { @@ -104,11 +119,19 @@ public ByteBuffer[] getPreIndexBuffers() { return buffers; } + /** + * This method will return an array of {@link ByteBuffer} representing the bytes from the current index + * of this buffer through the end. The buffers will be duplicates of the internal buffers, so any + * modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will not + * modify the this class. + * + * @return the byte buffers + */ public ByteBuffer[] getPostIndexBuffers() { if (internalIndex == capacity) { return new ByteBuffer[0]; } - int indexWithOffset = offset + internalIndex; + long indexWithOffset = offset + internalIndex; int pageIndex = pageIndex(indexWithOffset); int indexInPage = indexInPage(indexWithOffset); @@ -125,8 +148,12 @@ public ByteBuffer[] getPostIndexBuffers() { return buffers; } - public void incrementIndex(int delta) { - int newIndex = delta + internalIndex; + public void incrementIndex(long delta) { + if (delta < 0) { + throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]"); + } + + long newIndex = delta + internalIndex; if (newIndex > capacity) { throw new IllegalArgumentException("Cannot increment an index [" + internalIndex + "] with a delta [" + delta + "] that will result in a new index [" + newIndex + "] that is greater than the capacity [" + capacity + "]."); @@ -134,37 +161,41 @@ public void incrementIndex(int delta) { internalIndex = newIndex; } - public int getIndex() { + public long getIndex() { return internalIndex; } - public int getCapacity() { + public long getCapacity() { return capacity; } - public int getRemaining() { + public long getRemaining() { return capacity - internalIndex; } - private int numPages(int capacity) { - return (capacity + pageMask) >>> pageShift; + private int numPages(long capacity) { + final long numPages = (capacity + pageMask) >>> pageShift; + if (numPages > Integer.MAX_VALUE) { + throw new IllegalArgumentException("pageSize=" + (pageMask + 1) + " is too small for such as capacity: " + capacity); + } + return (int) numPages; } - private int pageIndex(int index) { - return index >>> pageShift; + private int pageIndex(long index) { + return (int) (index >>> pageShift); } - private int indexInPage(int index) { - return index & pageMask; + private int indexInPage(long index) { + return (int) (index & pageMask); } - public static class Page implements Releasable { + private static class Page implements Releasable { private final ByteBuffer buffer; private final Releasable releasable; - public Page(ByteBuffer buffer, Releasable releasable) { + private Page(ByteBuffer buffer, Releasable releasable) { this.buffer = buffer; this.releasable = releasable; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 4a7f9dc67f96b..16a822e7478aa 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -48,7 +48,7 @@ public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFr @Override public int read() throws IOException { if (channelBuffer.getRemaining() == 0) { - channelBuffer.expandCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); + channelBuffer.ensureCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); } int bytesRead = channel.read(channelBuffer); @@ -74,7 +74,7 @@ public int read() throws IOException { } catch (Exception e) { handler.handleException(channel, e); } finally { - channelBuffer.releasePagesFromHead(messageLengthWithHeader); + channelBuffer.release(messageLengthWithHeader); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java index bdf80fb9cb70d..c798f5dd9ca5d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java @@ -41,7 +41,7 @@ public void testExpandCapacity() { assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); - channelBuffer.expandCapacity(PAGE_SIZE + 1); + channelBuffer.ensureCapacity(PAGE_SIZE + 1); assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE * 2, channelBuffer.getRemaining()); @@ -53,7 +53,7 @@ public void testExpandCapacityMultiplePages() { assertEquals(PAGE_SIZE, channelBuffer.getCapacity()); int multiple = randomInt(80); - channelBuffer.expandCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500))); + channelBuffer.ensureCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500))); assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getCapacity()); assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getRemaining()); @@ -67,12 +67,12 @@ public void testExpandCapacityRespectsOffset() { int offset = randomInt(300); - channelBuffer.releasePagesFromHead(offset); + channelBuffer.release(offset); assertEquals(PAGE_SIZE - offset, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE - offset, channelBuffer.getRemaining()); - channelBuffer.expandCapacity(PAGE_SIZE + 1); + channelBuffer.ensureCapacity(PAGE_SIZE + 1); assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getCapacity()); assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getRemaining()); @@ -96,7 +96,7 @@ public void testIncrementIndexWithOffset() { assertEquals(0, channelBuffer.getIndex()); assertEquals(PAGE_SIZE, channelBuffer.getRemaining()); - channelBuffer.releasePagesFromHead(10); + channelBuffer.release(10); assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining()); channelBuffer.incrementIndex(10); @@ -104,7 +104,7 @@ public void testIncrementIndexWithOffset() { assertEquals(10, channelBuffer.getIndex()); assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); - channelBuffer.releasePagesFromHead(2); + channelBuffer.release(2); assertEquals(8, channelBuffer.getIndex()); assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining()); } @@ -113,9 +113,9 @@ public void testAccessByteBuffers() { InboundChannelBuffer channelBuffer = new InboundChannelBuffer(); int pages = randomInt(50) + 5; - channelBuffer.expandCapacity(pages * PAGE_SIZE); + channelBuffer.ensureCapacity(pages * PAGE_SIZE); - int capacity = channelBuffer.getCapacity(); + long capacity = channelBuffer.getCapacity(); ByteBuffer[] postIndexBuffers = channelBuffer.getPostIndexBuffers(); int i = 0; @@ -130,7 +130,7 @@ public void testAccessByteBuffers() { while (indexIncremented < capacity) { assertEquals(indexIncremented - bytesReleased, channelBuffer.getIndex()); - int amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining()); + long amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining()); assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), channelBuffer.getPostIndexBuffers()[0].get()); ByteBuffer[] preIndexBuffers = channelBuffer.getPreIndexBuffers(); if (preIndexBuffers.length > 0) { @@ -138,8 +138,8 @@ public void testAccessByteBuffers() { assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased - 1) % 127), preIndexBuffer.get(preIndexBuffer.limit() - 1)); } if (randomBoolean()) { - int bytesToRelease = Math.min(randomInt(50), channelBuffer.getIndex()); - channelBuffer.releasePagesFromHead(bytesToRelease); + long bytesToRelease = Math.min(randomInt(50), channelBuffer.getIndex()); + channelBuffer.release(bytesToRelease); bytesReleased += bytesToRelease; } channelBuffer.incrementIndex(amountToInc); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index dc8f51ece0c2b..d4b0a8f563d08 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.mockito.Matchers.any; @@ -57,7 +58,7 @@ public void testSuccessfulRead() throws IOException { byte[] bytes = createMessage(messageLength); byte[] fullMessage = combineMessageAndHeader(bytes); - final AtomicInteger bufferCapacity = new AtomicInteger(); + final AtomicLong bufferCapacity = new AtomicLong(); when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; ByteBuffer byteBuffer = buffer.getPostIndexBuffers()[0]; @@ -82,7 +83,7 @@ public void testPartialRead() throws IOException { byte[] fullPart1 = combineMessageAndHeader(part1, messageLength + messageLength); byte[] part2 = createMessage(messageLength); - final AtomicInteger bufferCapacity = new AtomicInteger(); + final AtomicLong bufferCapacity = new AtomicLong(); final AtomicReference bytes = new AtomicReference<>(); when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { From 55c4da06a5c458cd14935d8aa0e232fbad3be815 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 29 Nov 2017 19:01:41 -0700 Subject: [PATCH 10/14] Add documentation --- .../elasticsearch/transport/nio/InboundChannelBuffer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 52e3fec999b33..09251ecbb138c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -26,6 +26,12 @@ import java.util.Iterator; import java.util.function.Supplier; +/** + * This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read + * and consumed, the {@link #release(long)} method releases the bytes from the head of the buffer and closes + * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can + * be called and the buffer will expand using the supplier provided. + */ public class InboundChannelBuffer { public static final int PAGE_SIZE = 1 << 14; From 265a8f319cd1d64cb971525959f70028285c802c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Dec 2017 10:33:41 -0700 Subject: [PATCH 11/14] Changes based on review --- .../transport/nio/InboundChannelBuffer.java | 68 +++++++------------ .../transport/nio/channel/TcpReadContext.java | 3 +- 2 files changed, 26 insertions(+), 45 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 09251ecbb138c..0a92176b7ec65 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -19,8 +19,6 @@ package org.elasticsearch.transport.nio; -import org.elasticsearch.common.lease.Releasable; - import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Iterator; @@ -32,30 +30,29 @@ * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can * be called and the buffer will expand using the supplier provided. */ -public class InboundChannelBuffer { +public final class InboundChannelBuffer { - public static final int PAGE_SIZE = 1 << 14; + private static final int PAGE_SIZE = 1 << 14; + private static final int PAGE_MASK = PAGE_SIZE - 1; + private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; - private final int pageMask; - private final int pageShift; - private final ArrayDeque pages; - private final Supplier pageSupplier; + private final ArrayDeque pages; + private final Supplier pageSupplier; private long capacity = 0; private long internalIndex = 0; + // The offset is an int as it is the offset of where the bytes begin in the first buffer private int offset = 0; public InboundChannelBuffer() { - this(() -> new Page(ByteBuffer.wrap(new byte[PAGE_SIZE]), () -> {})); + this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE])); } - private InboundChannelBuffer(Supplier pageSupplier) { + private InboundChannelBuffer(Supplier pageSupplier) { this.pageSupplier = pageSupplier; this.pages = new ArrayDeque<>(); - this.pageMask = PAGE_SIZE - 1; - this.pageShift = Integer.numberOfTrailingZeros(PAGE_SIZE); this.capacity = PAGE_SIZE * pages.size(); ensureCapacity(PAGE_SIZE); } @@ -72,7 +69,8 @@ public void ensureCapacity(long requiredCapacity) { } /** - * This method will release bytes from the head of this buffer. + * This method will release bytes from the head of this buffer. If you release bytes past the current + * index the index is truncated to zero. * * @param bytesToRelease number of bytes to drop */ @@ -83,8 +81,7 @@ public void release(long bytesToRelease) { int pagesToRelease = pageIndex(offset + bytesToRelease); for (int i = 0; i < pagesToRelease; ++i) { - Page page = pages.removeFirst(); - page.close(); + pages.removeFirst(); } capacity -= bytesToRelease; internalIndex = Math.max(internalIndex - bytesToRelease, 0); @@ -111,12 +108,12 @@ public ByteBuffer[] getPreIndexBuffers() { } ByteBuffer[] buffers = new ByteBuffer[pageCount]; - Iterator pageIterator = pages.iterator(); - ByteBuffer firstBuffer = pageIterator.next().buffer.duplicate(); + Iterator pageIterator = pages.iterator(); + ByteBuffer firstBuffer = pageIterator.next().duplicate(); firstBuffer.position(firstBuffer.position() + offset); buffers[0] = firstBuffer; - for (int i = 1; i < buffers.length; ++i) { - buffers[i] = pageIterator.next().buffer.duplicate(); + for (int i = 1; i < buffers.length; i++) { + buffers[i] = pageIterator.next().duplicate(); } if (finalLimit != 0) { buffers[buffers.length - 1].limit(finalLimit); @@ -135,7 +132,7 @@ public ByteBuffer[] getPreIndexBuffers() { */ public ByteBuffer[] getPostIndexBuffers() { if (internalIndex == capacity) { - return new ByteBuffer[0]; + return EMPTY_BYTE_BUFFER_ARRAY; } long indexWithOffset = offset + internalIndex; @@ -143,11 +140,11 @@ public ByteBuffer[] getPostIndexBuffers() { int indexInPage = indexInPage(indexWithOffset); ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex]; - Iterator pageIterator = pages.descendingIterator(); + Iterator pageIterator = pages.descendingIterator(); for (int i = buffers.length - 1; i > 0; --i) { - buffers[i] = pageIterator.next().buffer.duplicate(); + buffers[i] = pageIterator.next().duplicate(); } - ByteBuffer firstPostIndexBuffer = pageIterator.next().buffer.duplicate(); + ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate(); firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage); buffers[0] = firstPostIndexBuffer; @@ -180,35 +177,18 @@ public long getRemaining() { } private int numPages(long capacity) { - final long numPages = (capacity + pageMask) >>> pageShift; + final long numPages = (capacity + PAGE_MASK) >>> PAGE_SHIFT; if (numPages > Integer.MAX_VALUE) { - throw new IllegalArgumentException("pageSize=" + (pageMask + 1) + " is too small for such as capacity: " + capacity); + throw new IllegalArgumentException("pageSize=" + (PAGE_MASK + 1) + " is too small for such as capacity: " + capacity); } return (int) numPages; } private int pageIndex(long index) { - return (int) (index >>> pageShift); + return (int) (index >>> PAGE_SHIFT); } private int indexInPage(long index) { - return (int) (index & pageMask); - } - - private static class Page implements Releasable { - - private final ByteBuffer buffer; - private final Releasable releasable; - - - private Page(ByteBuffer buffer, Releasable releasable) { - this.buffer = buffer; - this.releasable = releasable; - } - - @Override - public void close() { - releasable.close(); - } + return (int) (index & PAGE_MASK); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 16a822e7478aa..020c66088726e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -48,7 +48,8 @@ public TcpReadContext(TcpNioSocketChannel channel, TcpReadHandler handler, TcpFr @Override public int read() throws IOException { if (channelBuffer.getRemaining() == 0) { - channelBuffer.ensureCapacity(channelBuffer.getCapacity() + InboundChannelBuffer.PAGE_SIZE); + // Requiring one additional byte will ensure that a new page is allocated. + channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1); } int bytesRead = channel.read(channelBuffer); From 62daeb57370ba46097d0c01ede0dd4d436c0b179 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Dec 2017 10:39:31 -0700 Subject: [PATCH 12/14] Changes based on review --- .../org/elasticsearch/transport/nio/InboundChannelBuffer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 0a92176b7ec65..4e8b1a29862df 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -61,7 +61,7 @@ public void ensureCapacity(long requiredCapacity) { if (capacity < requiredCapacity) { int numPages = numPages(requiredCapacity + offset); int pagesToAdd = numPages - pages.size(); - for (int i = 0; i < pagesToAdd; ++i) { + for (int i = 0; i < pagesToAdd; i++) { pages.addLast(pageSupplier.get()); } capacity += pagesToAdd * PAGE_SIZE; @@ -80,7 +80,7 @@ public void release(long bytesToRelease) { } int pagesToRelease = pageIndex(offset + bytesToRelease); - for (int i = 0; i < pagesToRelease; ++i) { + for (int i = 0; i < pagesToRelease; i++) { pages.removeFirst(); } capacity -= bytesToRelease; From 1678ce6346f694ff8e15f11b7b108c4ba9b56dfc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Dec 2017 15:47:39 -0700 Subject: [PATCH 13/14] Changes based on review --- .../transport/nio/InboundChannelBuffer.java | 30 ++++++++++++------- .../nio/channel/NioSocketChannel.java | 2 +- .../transport/nio/channel/TcpReadContext.java | 2 +- .../nio/InboundChannelBufferTests.java | 9 +++--- .../nio/channel/TcpReadContextTests.java | 5 ++-- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index 4e8b1a29862df..d8bba7f83c65f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -90,17 +90,21 @@ public void release(long bytesToRelease) { /** * This method will return an array of {@link ByteBuffer} representing the bytes from the beginning of - * this buffer up through the current index. The buffers will be duplicates of the internal buffers, so - * any modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will - * not modify the this class. + * this buffer up through the index argument that was passed. The buffers will be duplicates of the + * internal buffers, so any modifications to the markers {@link ByteBuffer#position()}, + * {@link ByteBuffer#limit()}, etc will not modify the this class. * + * @param to the index to slice up to * @return the byte buffers */ - public ByteBuffer[] getPreIndexBuffers() { - if (internalIndex == 0) { + public ByteBuffer[] sliceBuffersTo(long to) { + if (to > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters to [" + to + "]"); + } else if (to == 0) { return EMPTY_BYTE_BUFFER_ARRAY; } - long indexWithOffset = internalIndex + offset; + long indexWithOffset = to + offset; int pageCount = pageIndex(indexWithOffset); int finalLimit = indexInPage(indexWithOffset); if (finalLimit != 0) { @@ -123,18 +127,22 @@ public ByteBuffer[] getPreIndexBuffers() { } /** - * This method will return an array of {@link ByteBuffer} representing the bytes from the current index - * of this buffer through the end. The buffers will be duplicates of the internal buffers, so any + * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed + * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any * modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will not * modify the this class. * + * @param from the index to slice from * @return the byte buffers */ - public ByteBuffer[] getPostIndexBuffers() { - if (internalIndex == capacity) { + public ByteBuffer[] sliceBuffersFrom(long from) { + if (from > capacity) { + throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity + + "], with slice parameters from [" + from + "]"); + } else if (from == capacity) { return EMPTY_BYTE_BUFFER_ARRAY; } - long indexWithOffset = offset + internalIndex; + long indexWithOffset = from + offset; int pageIndex = pageIndex(indexWithOffset); int indexInPage = indexInPage(indexWithOffset); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index 2ea0c2bae4ecc..0f6c671508815 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -74,7 +74,7 @@ public int write(ByteBuffer[] buffers) throws IOException { } public int read(InboundChannelBuffer buffer) throws IOException { - int bytesRead = (int) socketChannel.read(buffer.getPostIndexBuffers()); + int bytesRead = (int) socketChannel.read(buffer.sliceBuffersFrom(buffer.getIndex())); if (bytesRead == -1) { return bytesRead; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java index 020c66088726e..ae9fe0fdc933e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java @@ -83,7 +83,7 @@ public int read() throws IOException { } private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) { - ByteBuffer[] writtenToBuffers = channelBuffer.getPreIndexBuffers(); + ByteBuffer[] writtenToBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex()); ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length]; for (int i = 0; i < references.length; ++i) { references[i] = new ByteBufferReference(writtenToBuffers[i]); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java index c798f5dd9ca5d..7232a93871001 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java @@ -117,7 +117,7 @@ public void testAccessByteBuffers() { long capacity = channelBuffer.getCapacity(); - ByteBuffer[] postIndexBuffers = channelBuffer.getPostIndexBuffers(); + ByteBuffer[] postIndexBuffers = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()); int i = 0; for (ByteBuffer buffer : postIndexBuffers) { while (buffer.hasRemaining()) { @@ -131,8 +131,9 @@ public void testAccessByteBuffers() { assertEquals(indexIncremented - bytesReleased, channelBuffer.getIndex()); long amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining()); - assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), channelBuffer.getPostIndexBuffers()[0].get()); - ByteBuffer[] preIndexBuffers = channelBuffer.getPreIndexBuffers(); + ByteBuffer[] postIndexBuffers2 = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()); + assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), postIndexBuffers2[0].get()); + ByteBuffer[] preIndexBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex()); if (preIndexBuffers.length > 0) { ByteBuffer preIndexBuffer = preIndexBuffers[preIndexBuffers.length - 1]; assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased - 1) % 127), preIndexBuffer.get(preIndexBuffer.limit() - 1)); @@ -146,6 +147,6 @@ public void testAccessByteBuffers() { indexIncremented += amountToInc; } - assertEquals(0, channelBuffer.getPostIndexBuffers().length); + assertEquals(0, channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()).length); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index d4b0a8f563d08..73583353f73db 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -61,7 +60,7 @@ public void testSuccessfulRead() throws IOException { final AtomicLong bufferCapacity = new AtomicLong(); when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; - ByteBuffer byteBuffer = buffer.getPostIndexBuffers()[0]; + ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0]; bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); byteBuffer.put(fullMessage); buffer.incrementIndex(fullMessage.length); @@ -88,7 +87,7 @@ public void testPartialRead() throws IOException { when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> { InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0]; - ByteBuffer byteBuffer = buffer.getPostIndexBuffers()[0]; + ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0]; bufferCapacity.set(buffer.getCapacity() - buffer.getIndex()); byteBuffer.put(bytes.get()); buffer.incrementIndex(bytes.get().length); From eaffb6f5bf8f36a295bf44f308f793df8da23f01 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 6 Dec 2017 09:52:52 -0700 Subject: [PATCH 14/14] Change from review --- .../org/elasticsearch/transport/nio/InboundChannelBuffer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java index d8bba7f83c65f..46cec52bb6c32 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java @@ -181,7 +181,9 @@ public long getCapacity() { } public long getRemaining() { - return capacity - internalIndex; + long remaining = capacity - internalIndex; + assert remaining >= 0 : "The remaining [" + remaining + "] number of bytes should not be less than zero."; + return remaining; } private int numPages(long capacity) {