From d5c624098d7b9b17c5b2d48f9554862f73c92ce6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 1 Mar 2019 18:41:30 +0100 Subject: [PATCH 1/5] Optimize Bulk Message Parsing and Message Length Parsing * findNextMarker took almost 1ms per invocation during the PMC rally track * Fixed to be about an order of magnitude faster by using Netty's bulk `ByteBuf` search * It is unnecessary to instantiate an object (the input stream wrapper) and throw it away, just to read the `int` length from the message bytes * Fixed by adding bulk `int` read to BytesReference --- .../netty4/ByteBufBytesReference.java | 11 +++++++++ .../elasticsearch/http/nio/ByteBufUtils.java | 11 +++++++++ .../action/bulk/BulkRequest.java | 11 ++++----- .../action/search/MultiSearchRequest.java | 7 +++--- .../common/bytes/ByteBufferReference.java | 5 ++++ .../common/bytes/BytesReference.java | 23 +++++++++++++++++++ .../elasticsearch/transport/TcpTransport.java | 6 +---- 7 files changed, 59 insertions(+), 15 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java index 3b4aba1028119..4275ca784b6d1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java @@ -45,6 +45,17 @@ public byte get(int index) { return buffer.getByte(offset + index); } + @Override + public int getInt(int index) { + return buffer.getInt(offset + index); + } + + @Override + public int findNextMarker(byte marker, int from, int to) { + final int start = offset + from; + return buffer.forEachByte(start, to - start, value -> value != marker); + } + @Override public int length() { return length; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 1f18049514f60..4f5cb29dd519a 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -90,6 +90,17 @@ public byte get(int index) { return buffer.getByte(offset + index); } + @Override + public int getInt(int index) { + return buffer.getInt(offset + index); + } + + @Override + public int findNextMarker(byte marker, int from, int to) { + final int start = offset + from; + return buffer.forEachByte(start, to - start, value -> value != marker); + } + @Override public int length() { return length; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 42f569c0a9bda..5ed2ee2ccc86a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -615,16 +615,15 @@ public String routing() { return globalRouting; } - private int findNextMarker(byte marker, int from, BytesReference data, int length) { - for (int i = from; i < length; i++) { - if (data.get(i) == marker) { - return i; - } + private static int findNextMarker(byte marker, int from, BytesReference data, int length) { + final int res = data.findNextMarker(marker, from, length); + if (res > -1) { + return res; } if (from != length) { throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]"); } - return -1; + return res; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index 1dc2dc624b277..ba3f7001bc963 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -276,10 +276,9 @@ public static void readMultiLineFormat(BytesReference data, } private static int findNextMarker(byte marker, int from, BytesReference data, int length) { - for (int i = from; i < length; i++) { - if (data.get(i) == marker) { - return i; - } + final int res = data.findNextMarker(marker, from, length); + if (res > -1) { + return res; } if (from != length) { throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]"); diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java index 0f1a6a85ac30c..a36a19edc7e2e 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java @@ -46,6 +46,11 @@ public byte get(int index) { return buffer.get(index); } + @Override + public int getInt(int index) { + return buffer.getInt(index); + } + @Override public int length() { return length; diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index a16b8e4ef3bc5..6902d346a7e5d 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -60,6 +60,29 @@ public static BytesReference bytes(XContentBuilder xContentBuilder) { */ public abstract byte get(int index); + /** + * Returns the integer read from the 4 bytes starting at the given index. + */ + public int getInt(int index) { + return (get(index) & 0xFF) << 24 | (get(index + 1) & 0xFF) << 16 | (get(index + 2) & 0xFF) << 8 | get(index + 3) & 0xFF; + } + + /** + * Finds the index of the first occurrence of the given marker between within the given bounds. + * @param marker marker byte to search + * @param from lower bound for the index to check (inclusive) + * @param to upper bound for the index to check (exclusive) + * @return first index of the marker or {@code -1} if not found + */ + public int findNextMarker(byte marker, int from, int to) { + for (int i = from; i < to; i++) { + if (get(i) == marker) { + return i; + } + } + return -1; + } + /** * The length. */ diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index c477173a4e98a..170b20cba0b87 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -840,11 +840,7 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept + Integer.toHexString(headerBuffer.get(2) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")"); } - final int messageLength; - try (StreamInput input = headerBuffer.streamInput()) { - input.skip(TcpHeader.MARKER_BYTES_SIZE); - messageLength = input.readInt(); - } + final int messageLength = headerBuffer.getInt(TcpHeader.MARKER_BYTES_SIZE); if (messageLength == TransportKeepAlive.PING_DATA_SIZE) { // This is a ping From a756ad15f5da06c020e974a5ba6f1d8f9828f326 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 12:54:20 +0100 Subject: [PATCH 2/5] CR: comments --- .../transport/netty4/ByteBufBytesReference.java | 4 ++-- .../org/elasticsearch/http/nio/ByteBufUtils.java | 4 ++-- .../org/elasticsearch/action/bulk/BulkRequest.java | 14 +++++++------- .../action/search/MultiSearchRequest.java | 14 +++++++------- .../elasticsearch/common/bytes/BytesReference.java | 4 ++-- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java index 4275ca784b6d1..d8af523bf17df 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufBytesReference.java @@ -51,9 +51,9 @@ public int getInt(int index) { } @Override - public int findNextMarker(byte marker, int from, int to) { + public int indexOf(byte marker, int from) { final int start = offset + from; - return buffer.forEachByte(start, to - start, value -> value != marker); + return buffer.forEachByte(start, length - start, value -> value != marker); } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 4f5cb29dd519a..5c857869d0960 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -96,9 +96,9 @@ public int getInt(int index) { } @Override - public int findNextMarker(byte marker, int from, int to) { + public int indexOf(byte marker, int from) { final int start = offset + from; - return buffer.forEachByte(start, to - start, value -> value != marker); + return buffer.forEachByte(start, length - start, value -> value != marker); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 5ed2ee2ccc86a..c17507658bde2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -364,11 +364,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null XContent xContent = xContentType.xContent(); int line = 0; int from = 0; - int length = data.length(); byte marker = xContent.streamSeparator(); boolean typesDeprecationLogged = false; while (true) { - int nextMarker = findNextMarker(marker, from, data, length); + int nextMarker = findNextMarker(marker, from, data); if (nextMarker == -1) { break; } @@ -477,7 +476,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null add(new DeleteRequest(index, type, id).routing(routing) .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload); } else { - nextMarker = findNextMarker(marker, from, data, length); + nextMarker = findNextMarker(marker, from, data); if (nextMarker == -1) { break; } @@ -615,12 +614,13 @@ public String routing() { return globalRouting; } - private static int findNextMarker(byte marker, int from, BytesReference data, int length) { - final int res = data.findNextMarker(marker, from, length); - if (res > -1) { + private static int findNextMarker(byte marker, int from, BytesReference data) { + final int res = data.indexOf(marker, from); + if (res != -1) { + assert res >= 0; return res; } - if (from != length) { + if (from != data.length()) { throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]"); } return res; diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index ba3f7001bc963..195e146ab7ca1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -177,10 +177,9 @@ public static void readMultiLineFormat(BytesReference data, NamedXContentRegistry registry, boolean allowExplicitIndex) throws IOException { int from = 0; - int length = data.length(); byte marker = xContent.streamSeparator(); while (true) { - int nextMarker = findNextMarker(marker, from, data, length); + int nextMarker = findNextMarker(marker, from, data); if (nextMarker == -1) { break; } @@ -261,7 +260,7 @@ public static void readMultiLineFormat(BytesReference data, // move pointers from = nextMarker + 1; // now for the body - nextMarker = findNextMarker(marker, from, data, length); + nextMarker = findNextMarker(marker, from, data); if (nextMarker == -1) { break; } @@ -275,12 +274,13 @@ public static void readMultiLineFormat(BytesReference data, } } - private static int findNextMarker(byte marker, int from, BytesReference data, int length) { - final int res = data.findNextMarker(marker, from, length); - if (res > -1) { + private static int findNextMarker(byte marker, int from, BytesReference data) { + final int res = data.indexOf(marker, from); + if (res != -1) { + assert res >= 0; return res; } - if (from != length) { + if (from != data.length()) { throw new IllegalArgumentException("The msearch request must be terminated by a newline [\n]"); } return -1; diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index 6902d346a7e5d..8219edd7eb3f0 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -71,10 +71,10 @@ public int getInt(int index) { * Finds the index of the first occurrence of the given marker between within the given bounds. * @param marker marker byte to search * @param from lower bound for the index to check (inclusive) - * @param to upper bound for the index to check (exclusive) * @return first index of the marker or {@code -1} if not found */ - public int findNextMarker(byte marker, int from, int to) { + public int indexOf(byte marker, int from) { + final int to = length(); for (int i = from; i < to; i++) { if (get(i) == marker) { return i; From 2cc55861084e5c311f5082ad359cfaaa78e2e4b1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 14:18:26 +0100 Subject: [PATCH 3/5] CR: clarify BE --- .../java/org/elasticsearch/common/bytes/BytesReference.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index 8219edd7eb3f0..2c4867cbdfed9 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -61,7 +61,7 @@ public static BytesReference bytes(XContentBuilder xContentBuilder) { public abstract byte get(int index); /** - * Returns the integer read from the 4 bytes starting at the given index. + * Returns the integer read from the 4 bytes (BE) starting at the given index. */ public int getInt(int index) { return (get(index) & 0xFF) << 24 | (get(index + 1) & 0xFF) << 16 | (get(index + 2) & 0xFF) << 8 | get(index + 3) & 0xFF; From 087064c48c16b03f102e3333465aaa02fc377050 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 15:00:56 +0100 Subject: [PATCH 4/5] add tests --- .../bytes/AbstractBytesReferenceTestCase.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 1a907feabe24a..60a586e9dc4ba 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -34,7 +34,14 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public abstract class AbstractBytesReferenceTestCase extends ESTestCase { @@ -648,4 +655,32 @@ public void testBasicEquals() { assertNotEquals(b1, b2); } } + + public void testGetInt() throws IOException { + final int count = randomIntBetween(1, 10); + final BytesReference bytesReference = newBytesReference(count * Integer.BYTES); + final IntBuffer intBuffer = ByteBuffer.wrap(bytesReference.toBytesRef().bytes).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); + for (int i = 0; i < count; ++i) { + assertEquals(intBuffer.get(i), bytesReference.getInt(i * Integer.BYTES)); + } + } + + public void testIndexOf() throws IOException { + final int size = randomIntBetween(0, 100); + final BytesReference bytesReference = newBytesReference(size); + final Map> map = new HashMap<>(); + for (int i = 0; i < size; ++i) { + final byte value = bytesReference.get(i); + map.computeIfAbsent(value, v -> new ArrayList<>()).add(i); + } + map.forEach((value, positions) -> { + for (int i = 0; i < positions.size(); i++) { + final int pos = positions.get(i); + final int from = i == 0 ? randomIntBetween(0, pos) : positions.get(i - 1) + 1; + assertEquals(bytesReference.indexOf(value, from), pos); + } + }); + final byte missing = randomValueOtherThanMany(map::containsKey, ESTestCase::randomByte); + assertEquals(-1, bytesReference.indexOf(missing, randomIntBetween(0, Math.max(0, size - 1)))); + } } From 83b8cf041ca1147361351c965e7f5d71503519c0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 15:28:36 +0100 Subject: [PATCH 5/5] fix test --- .../common/bytes/AbstractBytesReferenceTestCase.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 60a586e9dc4ba..478ac149dd1f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -659,7 +659,9 @@ public void testBasicEquals() { public void testGetInt() throws IOException { final int count = randomIntBetween(1, 10); final BytesReference bytesReference = newBytesReference(count * Integer.BYTES); - final IntBuffer intBuffer = ByteBuffer.wrap(bytesReference.toBytesRef().bytes).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); + final BytesRef bytesRef = bytesReference.toBytesRef(); + final IntBuffer intBuffer = + ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); for (int i = 0; i < count; ++i) { assertEquals(intBuffer.get(i), bytesReference.getInt(i * Integer.BYTES)); }