From 9184c7fbdd191173063345a22f894086a6cf9b99 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 2 Dec 2018 16:34:07 -0700 Subject: [PATCH 01/13] WIP --- .../elasticsearch/transport/TcpTransport.java | 6 +- .../transport/TcpTransportHandshaker.java | 60 ++++++++++++++++--- .../transport/TransportStatus.java | 8 +++ .../transport/TransportHandshakerTests.java | 6 +- .../AbstractSimpleTransportTestCase.java | 12 ++-- 5 files changed, 71 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 659c264ab3749..438008dedbf59 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -223,8 +223,8 @@ public TcpTransport(String transportName, Settings settings, Version version, T this.transportLogger = new TransportLogger(); this.handshaker = new TcpTransportHandshaker(version, threadPool, (node, channel, requestId, v) -> sendRequestToChannel(node, channel, requestId, - TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE, TransportRequestOptions.EMPTY, v, - TransportStatus.setHandshake((byte) 0)), + TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, new TcpTransportHandshaker.HandshakeRequest(version), + TransportRequestOptions.EMPTY, v, TransportStatus.setHandshake((byte) 0)), (v, features, channel, response, requestId) -> sendResponse(v, features, channel, response, requestId, TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0))); this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage); @@ -1284,7 +1284,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { - handshaker.handleHandshake(version, features, channel, requestId); + handshaker.handleHandshake(version, features, channel, requestId, stream); } else { final RequestHandlerRegistry reg = getRequestHandler(action); if (reg == null) { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java index d1037d2bcb5bd..108ca969a7471 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java @@ -84,10 +84,15 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV } void handleHandshake(Version version, Set features, TcpChannel channel, long requestId) throws IOException { - handshakeResponseSender.sendResponse(version, features, channel, new VersionHandshakeResponse(this.version), requestId); + handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(this.version), requestId); } - TransportResponseHandler removeHandlerForHandshake(long requestId) { + void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput input) throws IOException { + HandshakeRequest handshakeRequest = new HandshakeRequest(input); + handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(this.version), requestId); + } + + TransportResponseHandler removeHandlerForHandshake(long requestId) { return pendingHandshakes.remove(requestId); } @@ -99,7 +104,7 @@ long getNumHandshakes() { return numHandshakes.count(); } - private class HandshakeResponseHandler implements TransportResponseHandler { + private class HandshakeResponseHandler implements TransportResponseHandler { private final long requestId; private final Version currentVersion; @@ -113,12 +118,12 @@ private HandshakeResponseHandler(long requestId, Version currentVersion, ActionL } @Override - public VersionHandshakeResponse read(StreamInput in) throws IOException { - return new VersionHandshakeResponse(in); + public HandshakeResponse read(StreamInput in) throws IOException { + return new HandshakeResponse(in); } @Override - public void handleResponse(VersionHandshakeResponse response) { + public void handleResponse(HandshakeResponse response) { if (isDone.compareAndSet(false, true)) { Version version = response.version; if (currentVersion.isCompatible(version) == false) { @@ -149,19 +154,56 @@ public String executor() { } } - static final class VersionHandshakeResponse extends TransportResponse { + static final class HandshakeRequest extends TransportRequest { + + private static final byte[] EMPTY_ARRAY = new byte[0]; private final Version version; + private final byte[] futureBytes; - VersionHandshakeResponse(Version version) { + HandshakeRequest(Version version) { this.version = version; + this.futureBytes = EMPTY_ARRAY; } - private VersionHandshakeResponse(StreamInput in) throws IOException { + HandshakeRequest(StreamInput streamInput) throws IOException { + super(streamInput); + this.version = Version.readVersion(streamInput); + this.futureBytes = new byte[streamInput.available()]; + streamInput.readBytes(futureBytes, 0, futureBytes.length); + } + + @Override + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + assert version != null; + Version.writeVersion(version, out); + } + } + + static final class HandshakeResponse extends TransportResponse { + + private final Version version; + + HandshakeResponse(Version version) { + this.version = version; + } + + private HandshakeResponse(StreamInput in) throws IOException { super.readFrom(in); version = Version.readVersion(in); } + @Override + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportStatus.java b/server/src/main/java/org/elasticsearch/transport/TransportStatus.java index 2f5f6d6bd9bb5..e20f7a55e1517 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportStatus.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportStatus.java @@ -25,6 +25,7 @@ public final class TransportStatus { private static final byte STATUS_ERROR = 1 << 1; private static final byte STATUS_COMPRESS = 1 << 2; private static final byte STATUS_HANDSHAKE = 1 << 3; + private static final byte STATUS_HANDSHAKE_V2 = 1 << 4; public static boolean isRequest(byte value) { return (value & STATUS_REQRES) == 0; @@ -67,5 +68,12 @@ static byte setHandshake(byte value) { // pkg private since it's only used inter return value; } + static boolean isHandshakeV2(byte value) { // pkg private since it's only used internally + return (value & STATUS_HANDSHAKE_V2) != 0; + } + static byte setHandshakeV2(byte value) { // pkg private since it's only used internally + value |= STATUS_HANDSHAKE_V2; + return value; + } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index ec6860f6adddf..5f0bfdbbf1692 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -81,8 +81,8 @@ public void testHandshakeRequestAndResponse() throws IOException { verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(), eq(reqId)); - TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); - handler.handleResponse((TcpTransportHandshaker.VersionHandshakeResponse) responseCaptor.getValue()); + TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); + handler.handleResponse((TcpTransportHandshaker.HandshakeResponse) responseCaptor.getValue()); assertTrue(versionFuture.isDone()); assertEquals(Version.CURRENT, versionFuture.actionGet()); @@ -97,7 +97,7 @@ public void testHandshakeError() throws IOException { assertFalse(versionFuture.isDone()); - TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); + TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); handler.handleException(new TransportException("failed")); assertTrue(versionFuture.isDone()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 6a06d0f72e14c..f20e566e1e34a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2382,7 +2382,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(45, transportStats.getTxSize().getBytes()); + assertEquals(49, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2392,7 +2392,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(101, transportStats.getTxSize().getBytes()); + assertEquals(105, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2400,7 +2400,7 @@ public String executor() { assertEquals(2, stats.getRxCount()); assertEquals(2, stats.getTxCount()); assertEquals(46, stats.getRxSize().getBytes()); - assertEquals(101, stats.getTxSize().getBytes()); + assertEquals(105, stats.getTxSize().getBytes()); } finally { serviceC.close(); } @@ -2497,7 +2497,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(45, transportStats.getTxSize().getBytes()); + assertEquals(49, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2507,7 +2507,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(101, transportStats.getTxSize().getBytes()); + assertEquals(105, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2522,7 +2522,7 @@ public String executor() { // 49 bytes are the non-exception message bytes that have been received. It should include the initial // handshake message and the header, version, etc bytes in the exception message. assertEquals(failedMessage, 49 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); - assertEquals(101, stats.getTxSize().getBytes()); + assertEquals(105, stats.getTxSize().getBytes()); } finally { serviceC.close(); } From 49f669e1c4b8a95cd14519a0fdd0dc9f40f70c16 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 2 Dec 2018 18:36:55 -0700 Subject: [PATCH 02/13] WIP --- .../elasticsearch/transport/TcpTransport.java | 8 +++--- ...ndshaker.java => TransportHandshaker.java} | 26 +++++++++++++------ .../transport/TransportHandshakerTests.java | 18 ++++++------- .../AbstractSimpleTransportTestCase.java | 14 +++++----- 4 files changed, 38 insertions(+), 28 deletions(-) rename server/src/main/java/org/elasticsearch/transport/{TcpTransportHandshaker.java => TransportHandshaker.java} (88%) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 438008dedbf59..98fc57c47cb5e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -202,7 +202,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final TransportLogger transportLogger; - private final TcpTransportHandshaker handshaker; + private final TransportHandshaker handshaker; private final TransportKeepAlive keepAlive; private final String nodeName; @@ -221,12 +221,12 @@ public TcpTransport(String transportName, Settings settings, Version version, T this.networkService = networkService; this.transportName = transportName; this.transportLogger = new TransportLogger(); - this.handshaker = new TcpTransportHandshaker(version, threadPool, + this.handshaker = new TransportHandshaker(version, threadPool, (node, channel, requestId, v) -> sendRequestToChannel(node, channel, requestId, - TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, new TcpTransportHandshaker.HandshakeRequest(version), + TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version), TransportRequestOptions.EMPTY, v, TransportStatus.setHandshake((byte) 0)), (v, features, channel, response, requestId) -> sendResponse(v, features, channel, response, requestId, - TcpTransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0))); + TransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0))); this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage); this.nodeName = Node.NODE_NAME_SETTING.get(settings); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java similarity index 88% rename from server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java rename to server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 108ca969a7471..9067102e07598 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -21,6 +21,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.metrics.CounterMetric; @@ -37,7 +39,7 @@ * Sends and receives transport-level connection handshakes. This class will send the initial handshake, * manage state/timeouts while the handshake is in transit, and handle the eventual response. */ -final class TcpTransportHandshaker { +final class TransportHandshaker { static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; private final ConcurrentMap pendingHandshakes = new ConcurrentHashMap<>(); @@ -48,8 +50,8 @@ final class TcpTransportHandshaker { private final HandshakeRequestSender handshakeRequestSender; private final HandshakeResponseSender handshakeResponseSender; - TcpTransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender, - HandshakeResponseSender handshakeResponseSender) { + TransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender, + HandshakeResponseSender handshakeResponseSender) { this.version = version; this.threadPool = threadPool; this.handshakeRequestSender = handshakeRequestSender; @@ -159,18 +161,21 @@ static final class HandshakeRequest extends TransportRequest { private static final byte[] EMPTY_ARRAY = new byte[0]; private final Version version; - private final byte[] futureBytes; + private final byte[] futureVersionBytes; HandshakeRequest(Version version) { this.version = version; - this.futureBytes = EMPTY_ARRAY; + this.futureVersionBytes = EMPTY_ARRAY; } HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); + int messageBytes = streamInput.readInt(); + int currentlyAvailable = streamInput.available(); this.version = Version.readVersion(streamInput); - this.futureBytes = new byte[streamInput.available()]; - streamInput.readBytes(futureBytes, 0, futureBytes.length); + int futureBytesLength = messageBytes - (currentlyAvailable - streamInput.available()); + this.futureVersionBytes = new byte[futureBytesLength]; + streamInput.readBytes(futureVersionBytes, 0, futureVersionBytes.length); } @Override @@ -182,7 +187,12 @@ public void readFrom(StreamInput in) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); assert version != null; - Version.writeVersion(version, out); + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(4)) { + Version.writeVersion(version, bytesStreamOutput); + BytesReference reference = bytesStreamOutput.bytes(); + out.writeInt(reference.length()); + reference.writeTo(out); + } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 5f0bfdbbf1692..5d386ddb09e63 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -38,24 +38,24 @@ public class TransportHandshakerTests extends ESTestCase { - private TcpTransportHandshaker handshaker; + private TransportHandshaker handshaker; private DiscoveryNode node; private TcpChannel channel; private TestThreadPool threadPool; - private TcpTransportHandshaker.HandshakeRequestSender requestSender; - private TcpTransportHandshaker.HandshakeResponseSender responseSender; + private TransportHandshaker.HandshakeRequestSender requestSender; + private TransportHandshaker.HandshakeResponseSender responseSender; @Override public void setUp() throws Exception { super.setUp(); String nodeId = "node-id"; channel = mock(TcpChannel.class); - requestSender = mock(TcpTransportHandshaker.HandshakeRequestSender.class); - responseSender = mock(TcpTransportHandshaker.HandshakeResponseSender.class); + requestSender = mock(TransportHandshaker.HandshakeRequestSender.class); + responseSender = mock(TransportHandshaker.HandshakeResponseSender.class); node = new DiscoveryNode(nodeId, nodeId, nodeId, "host", "host_address", buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); threadPool = new TestThreadPool("thread-poll"); - handshaker = new TcpTransportHandshaker(Version.CURRENT, threadPool, requestSender, responseSender); + handshaker = new TransportHandshaker(Version.CURRENT, threadPool, requestSender, responseSender); } @Override @@ -81,8 +81,8 @@ public void testHandshakeRequestAndResponse() throws IOException { verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(), eq(reqId)); - TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); - handler.handleResponse((TcpTransportHandshaker.HandshakeResponse) responseCaptor.getValue()); + TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); + handler.handleResponse((TransportHandshaker.HandshakeResponse) responseCaptor.getValue()); assertTrue(versionFuture.isDone()); assertEquals(Version.CURRENT, versionFuture.actionGet()); @@ -97,7 +97,7 @@ public void testHandshakeError() throws IOException { assertFalse(versionFuture.isDone()); - TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); + TransportResponseHandler handler = handshaker.removeHandlerForHandshake(reqId); handler.handleException(new TransportException("failed")); assertTrue(versionFuture.isDone()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index f20e566e1e34a..b4283d19968c6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2382,7 +2382,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(49, transportStats.getTxSize().getBytes()); + assertEquals(53, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2392,7 +2392,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(105, transportStats.getTxSize().getBytes()); + assertEquals(109, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2400,7 +2400,7 @@ public String executor() { assertEquals(2, stats.getRxCount()); assertEquals(2, stats.getTxCount()); assertEquals(46, stats.getRxSize().getBytes()); - assertEquals(105, stats.getTxSize().getBytes()); + assertEquals(109, stats.getTxSize().getBytes()); } finally { serviceC.close(); } @@ -2497,7 +2497,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(49, transportStats.getTxSize().getBytes()); + assertEquals(53, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2507,7 +2507,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(105, transportStats.getTxSize().getBytes()); + assertEquals(109, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2521,8 +2521,8 @@ public String executor() { String failedMessage = "Unexpected read bytes size. The transport exception that was received=" + exception; // 49 bytes are the non-exception message bytes that have been received. It should include the initial // handshake message and the header, version, etc bytes in the exception message. - assertEquals(failedMessage, 49 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); - assertEquals(105, stats.getTxSize().getBytes()); + assertEquals(failedMessage, 53 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); + assertEquals(109, stats.getTxSize().getBytes()); } finally { serviceC.close(); } From 3f2e9051d20bd758cfe8f057abf4995e9e1fa4ba Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 2 Dec 2018 18:38:41 -0700 Subject: [PATCH 03/13] WIP --- .../transport/AbstractSimpleTransportTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index b4283d19968c6..a0bf8e7c3f5b2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2521,7 +2521,7 @@ public String executor() { String failedMessage = "Unexpected read bytes size. The transport exception that was received=" + exception; // 49 bytes are the non-exception message bytes that have been received. It should include the initial // handshake message and the header, version, etc bytes in the exception message. - assertEquals(failedMessage, 53 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); + assertEquals(failedMessage, 49 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); assertEquals(109, stats.getTxSize().getBytes()); } finally { serviceC.close(); From a3ce7748e8e03a85014b6423e4a860ddf5ae2ec5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 2 Dec 2018 19:41:04 -0700 Subject: [PATCH 04/13] WIP --- .../transport/TransportHandshaker.java | 21 ++++++++----------- .../transport/TransportHandshakerTests.java | 4 +++- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 9067102e07598..dd9437278ae70 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -85,13 +85,9 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV } } - void handleHandshake(Version version, Set features, TcpChannel channel, long requestId) throws IOException { - handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(this.version), requestId); - } - void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput input) throws IOException { HandshakeRequest handshakeRequest = new HandshakeRequest(input); - handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(this.version), requestId); + handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(handshakeRequest.version, this.version), requestId); } TransportResponseHandler removeHandlerForHandshake(long requestId) { @@ -127,7 +123,7 @@ public HandshakeResponse read(StreamInput in) throws IOException { @Override public void handleResponse(HandshakeResponse response) { if (isDone.compareAndSet(false, true)) { - Version version = response.version; + Version version = response.responseVersion; if (currentVersion.isCompatible(version) == false) { listener.onFailure(new IllegalStateException("Received message from unsupported version: [" + version + "] minimal compatible version is: [" + currentVersion.minimumCompatibilityVersion() + "]")); @@ -198,15 +194,16 @@ public void writeTo(StreamOutput out) throws IOException { static final class HandshakeResponse extends TransportResponse { - private final Version version; + private final Version responseVersion; + private Version requestVersion; - HandshakeResponse(Version version) { - this.version = version; + HandshakeResponse(Version requestVersion, Version responseVersion) { + this.responseVersion = responseVersion; } private HandshakeResponse(StreamInput in) throws IOException { super.readFrom(in); - version = Version.readVersion(in); + responseVersion = Version.readVersion(in); } @Override @@ -217,8 +214,8 @@ public void readFrom(StreamInput in) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - assert version != null; - Version.writeVersion(version, out); + assert responseVersion != null; + Version.writeVersion(responseVersion, out); } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 5d386ddb09e63..6ff3260ac26fd 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -74,7 +75,8 @@ public void testHandshakeRequestAndResponse() throws IOException { assertFalse(versionFuture.isDone()); TcpChannel mockChannel = mock(TcpChannel.class); - handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId); + StreamInput input = null; + handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(TransportResponse.class); From 4ae5ea1655f101c79a6f28748d865d9ae51eaf66 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 2 Dec 2018 20:56:34 -0700 Subject: [PATCH 05/13] WIP --- .../transport/TransportHandshaker.java | 4 ++ .../transport/TransportHandshakerTests.java | 44 ++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index dd9437278ae70..e10bf9b5a3d99 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -217,6 +217,10 @@ public void writeTo(StreamOutput out) throws IOException { assert responseVersion != null; Version.writeVersion(responseVersion, out); } + + Version getResponseVersion() { + return responseVersion; + } } @FunctionalInterface diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 6ff3260ac26fd..22fcf81356811 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -21,8 +21,10 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.mockito.ArgumentCaptor; @@ -75,7 +77,10 @@ public void testHandshakeRequestAndResponse() throws IOException { assertFalse(versionFuture.isDone()); TcpChannel mockChannel = mock(TcpChannel.class); - StreamInput input = null; + TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + handshakeRequest.writeTo(bytesStreamOutput); + StreamInput input = bytesStreamOutput.bytes().streamInput(); handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input); @@ -90,6 +95,42 @@ public void testHandshakeRequestAndResponse() throws IOException { assertEquals(Version.CURRENT, versionFuture.actionGet()); } + public void testHandshakeRequestFutureVersionsCompatibility() throws IOException { + PlainActionFuture versionFuture = PlainActionFuture.newFuture(); + long reqId = randomLongBetween(1, 10); + handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture); + + verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion()); + + assertFalse(versionFuture.isDone()); + + TcpChannel mockChannel = mock(TcpChannel.class); + TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + handshakeRequest.writeTo(bytesStreamOutput); + BytesStreamOutput taskIdStream = new BytesStreamOutput(); + TaskId.EMPTY_TASK_ID.writeTo(taskIdStream); + int taskIdBytes = taskIdStream.bytes().length(); + int currentPosition = (int) bytesStreamOutput.position(); + bytesStreamOutput.seek(taskIdBytes); + bytesStreamOutput.writeInt((currentPosition - taskIdBytes - 4) + 1024); + bytesStreamOutput.seek(currentPosition); + bytesStreamOutput.write(new byte[1024]); + StreamInput input = bytesStreamOutput.bytes().streamInput(); + assertEquals(1033, input.available()); + handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input); + assertEquals(0, input.available()); + + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(TransportResponse.class); + verify(responseSender).sendResponse(eq(Version.CURRENT), eq(Collections.emptySet()), eq(mockChannel), responseCaptor.capture(), + eq(reqId)); + + TransportHandshaker.HandshakeResponse response = (TransportHandshaker.HandshakeResponse) responseCaptor.getValue(); + + assertEquals(Version.CURRENT, response.getResponseVersion()); + } + public void testHandshakeError() throws IOException { PlainActionFuture versionFuture = PlainActionFuture.newFuture(); long reqId = randomLongBetween(1, 10); @@ -115,7 +156,6 @@ public void testSendRequestThrowsException() throws IOException { handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture); - assertTrue(versionFuture.isDone()); ConnectTransportException cte = expectThrows(ConnectTransportException.class, versionFuture::actionGet); assertThat(cte.getMessage(), containsString("failure to send internal:tcp/handshake")); From 78d4309512c05a639bfc99ced40cc277a69cb7d0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 3 Dec 2018 11:11:20 -0700 Subject: [PATCH 06/13] WIP --- .../transport/TransportHandshaker.java | 7 +++++ .../transport/TransportStatus.java | 10 ------- .../transport/TransportHandshakerTests.java | 30 ++++++++++++++++--- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index e10bf9b5a3d99..a7b2284d83a71 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -154,6 +154,9 @@ public String executor() { static final class HandshakeRequest extends TransportRequest { + // Allow a maximum for 2KB for future handshake message. This 2KB limit excludes any bytes serialized + // by the abstract TransportRequest. + private static final int MAX_HANDSHAKE_REQUEST_BYTES = 1 << 11; private static final byte[] EMPTY_ARRAY = new byte[0]; private final Version version; @@ -167,6 +170,10 @@ static final class HandshakeRequest extends TransportRequest { HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); int messageBytes = streamInput.readInt(); + if (messageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { + throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " + + messageBytes + " bytes."); + } int currentlyAvailable = streamInput.available(); this.version = Version.readVersion(streamInput); int futureBytesLength = messageBytes - (currentlyAvailable - streamInput.available()); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportStatus.java b/server/src/main/java/org/elasticsearch/transport/TransportStatus.java index e20f7a55e1517..0746ed91cfbfc 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportStatus.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportStatus.java @@ -25,7 +25,6 @@ public final class TransportStatus { private static final byte STATUS_ERROR = 1 << 1; private static final byte STATUS_COMPRESS = 1 << 2; private static final byte STATUS_HANDSHAKE = 1 << 3; - private static final byte STATUS_HANDSHAKE_V2 = 1 << 4; public static boolean isRequest(byte value) { return (value & STATUS_REQRES) == 0; @@ -67,13 +66,4 @@ static byte setHandshake(byte value) { // pkg private since it's only used inter value |= STATUS_HANDSHAKE; return value; } - - static boolean isHandshakeV2(byte value) { // pkg private since it's only used internally - return (value & STATUS_HANDSHAKE_V2) != 0; - } - - static byte setHandshakeV2(byte value) { // pkg private since it's only used internally - value |= STATUS_HANDSHAKE_V2; - return value; - } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 22fcf81356811..4520f45de2ed4 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -96,14 +96,11 @@ public void testHandshakeRequestAndResponse() throws IOException { } public void testHandshakeRequestFutureVersionsCompatibility() throws IOException { - PlainActionFuture versionFuture = PlainActionFuture.newFuture(); long reqId = randomLongBetween(1, 10); - handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), versionFuture); + handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture()); verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion()); - assertFalse(versionFuture.isDone()); - TcpChannel mockChannel = mock(TcpChannel.class); TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); @@ -131,6 +128,31 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException assertEquals(Version.CURRENT, response.getResponseVersion()); } + public void testHandshakeRequestToLarge() throws IOException { + long reqId = randomLongBetween(1, 10); + handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture()); + + verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion()); + + TcpChannel mockChannel = mock(TcpChannel.class); + TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + handshakeRequest.writeTo(bytesStreamOutput); + BytesStreamOutput taskIdStream = new BytesStreamOutput(); + TaskId.EMPTY_TASK_ID.writeTo(taskIdStream); + int taskIdBytes = taskIdStream.bytes().length(); + int currentPosition = (int) bytesStreamOutput.position(); + bytesStreamOutput.seek(taskIdBytes); + bytesStreamOutput.writeInt((currentPosition - taskIdBytes - 4) + 2048); + bytesStreamOutput.seek(currentPosition); + bytesStreamOutput.write(new byte[2048]); + StreamInput input = bytesStreamOutput.bytes().streamInput(); + assertEquals(2057, input.available()); + IOException ioException = expectThrows(IOException.class, + () -> handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input)); + assertEquals("Handshake request limited to 2048 bytes. Found 2052 bytes.", ioException.getMessage()); + } + public void testHandshakeError() throws IOException { PlainActionFuture versionFuture = PlainActionFuture.newFuture(); long reqId = randomLongBetween(1, 10); From ff53b818b75e0780a90b07f19ad946b5322e5964 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 3 Dec 2018 11:46:25 -0700 Subject: [PATCH 07/13] Fix checkstyle --- .../java/org/elasticsearch/transport/TransportHandshaker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index a7b2284d83a71..8c1a5e9e59536 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -87,7 +87,8 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput input) throws IOException { HandshakeRequest handshakeRequest = new HandshakeRequest(input); - handshakeResponseSender.sendResponse(version, features, channel, new HandshakeResponse(handshakeRequest.version, this.version), requestId); + HandshakeResponse response = new HandshakeResponse(handshakeRequest.version, this.version); + handshakeResponseSender.sendResponse(version, features, channel, response, requestId); } TransportResponseHandler removeHandlerForHandshake(long requestId) { From 21063be75b76e3ff0e6bfa31fd86285722ffb583 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 3 Dec 2018 12:03:40 -0700 Subject: [PATCH 08/13] WIP --- .../transport/TransportHandshaker.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 8c1a5e9e59536..6390daad50088 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -155,31 +156,30 @@ public String executor() { static final class HandshakeRequest extends TransportRequest { - // Allow a maximum for 2KB for future handshake message. This 2KB limit excludes any bytes serialized - // by the abstract TransportRequest. + // Allow a maximum of 2KB for future handshake request versions. This 2KB limit excludes any bytes + // serialized by the abstract TransportRequest. private static final int MAX_HANDSHAKE_REQUEST_BYTES = 1 << 11; - private static final byte[] EMPTY_ARRAY = new byte[0]; private final Version version; - private final byte[] futureVersionBytes; HandshakeRequest(Version version) { this.version = version; - this.futureVersionBytes = EMPTY_ARRAY; } HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); int messageBytes = streamInput.readInt(); + if (messageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " + messageBytes + " bytes."); } - int currentlyAvailable = streamInput.available(); - this.version = Version.readVersion(streamInput); - int futureBytesLength = messageBytes - (currentlyAvailable - streamInput.available()); - this.futureVersionBytes = new byte[futureBytesLength]; - streamInput.readBytes(futureVersionBytes, 0, futureVersionBytes.length); + byte[] messageByteArray = new byte[messageBytes]; + streamInput.readBytes(messageByteArray, 0, messageByteArray.length); + BytesArray bytesArray = new BytesArray(messageByteArray); + try (StreamInput messageStreamInput = bytesArray.streamInput()) { + this.version = Version.readVersion(messageStreamInput); + } } @Override From 212b30e0c8aae41d87e099e9a8187edfa8d4113e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 3 Dec 2018 12:09:24 -0700 Subject: [PATCH 09/13] WIP --- .../transport/TransportHandshaker.java | 26 +++++++++---------- .../transport/TransportHandshakerTests.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 6390daad50088..771797dfc8be2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -168,16 +168,16 @@ static final class HandshakeRequest extends TransportRequest { HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); - int messageBytes = streamInput.readInt(); + int remainingMessageBytes = streamInput.readInt(); - if (messageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { + int totalMessageBytes = remainingMessageBytes + 4; + if (totalMessageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " - + messageBytes + " bytes."); + + totalMessageBytes + " bytes."); } - byte[] messageByteArray = new byte[messageBytes]; + byte[] messageByteArray = new byte[remainingMessageBytes]; streamInput.readBytes(messageByteArray, 0, messageByteArray.length); - BytesArray bytesArray = new BytesArray(messageByteArray); - try (StreamInput messageStreamInput = bytesArray.streamInput()) { + try (StreamInput messageStreamInput = new BytesArray(messageByteArray).streamInput()) { this.version = Version.readVersion(messageStreamInput); } } @@ -188,14 +188,14 @@ public void readFrom(StreamInput in) { } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + public void writeTo(StreamOutput streamOutput) throws IOException { + super.writeTo(streamOutput); assert version != null; - try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(4)) { - Version.writeVersion(version, bytesStreamOutput); - BytesReference reference = bytesStreamOutput.bytes(); - out.writeInt(reference.length()); - reference.writeTo(out); + try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput(4)) { + Version.writeVersion(version, messageStreamOutput); + BytesReference reference = messageStreamOutput.bytes(); + streamOutput.writeInt(reference.length()); + reference.writeTo(streamOutput); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 4520f45de2ed4..2b7f8e6c61c7a 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -150,7 +150,7 @@ public void testHandshakeRequestToLarge() throws IOException { assertEquals(2057, input.available()); IOException ioException = expectThrows(IOException.class, () -> handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input)); - assertEquals("Handshake request limited to 2048 bytes. Found 2052 bytes.", ioException.getMessage()); + assertEquals("Handshake request limited to 2048 bytes. Found 2056 bytes.", ioException.getMessage()); } public void testHandshakeError() throws IOException { From 6ee3b7a237d4fd325771b9d7b31d5099d9489763 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 3 Dec 2018 15:05:00 -0700 Subject: [PATCH 10/13] Fix tests --- .../transport/netty4/ByteBufStreamInput.java | 6 +++- .../elasticsearch/http/nio/ByteBufUtils.java | 6 +++- .../transport/TransportHandshaker.java | 32 ++++++++++++------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java index 2713f34308575..6fe631aeab7bb 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java @@ -132,7 +132,11 @@ public int skipBytes(int n) throws IOException { @Override public byte readByte() throws IOException { - return buffer.readByte(); + try { + return buffer.readByte(); + } catch (IndexOutOfBoundsException ex) { + throw new EOFException(); + } } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 8f2908eae8563..9574a044fdfcf 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -238,7 +238,11 @@ public int skipBytes(int n) throws IOException { @Override public byte readByte() throws IOException { - return buffer.readByte(); + try { + return buffer.readByte(); + } catch (IndexOutOfBoundsException ex) { + throw new EOFException(); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 771797dfc8be2..f1ea494ca765a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; +import java.io.EOFException; import java.io.IOException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -87,8 +88,7 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV } void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput input) throws IOException { - HandshakeRequest handshakeRequest = new HandshakeRequest(input); - HandshakeResponse response = new HandshakeResponse(handshakeRequest.version, this.version); + HandshakeResponse response = new HandshakeResponse(new HandshakeRequest(input).version, this.version); handshakeResponseSender.sendResponse(version, features, channel, response, requestId); } @@ -168,17 +168,25 @@ static final class HandshakeRequest extends TransportRequest { HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); - int remainingMessageBytes = streamInput.readInt(); - - int totalMessageBytes = remainingMessageBytes + 4; - if (totalMessageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { - throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " - + totalMessageBytes + " bytes."); + int remainingMessageBytes; + try { + remainingMessageBytes = streamInput.readInt(); + } catch (EOFException e) { + remainingMessageBytes = -1; } - byte[] messageByteArray = new byte[remainingMessageBytes]; - streamInput.readBytes(messageByteArray, 0, messageByteArray.length); - try (StreamInput messageStreamInput = new BytesArray(messageByteArray).streamInput()) { - this.version = Version.readVersion(messageStreamInput); + if (remainingMessageBytes == -1) { + version = null; + } else { + int totalMessageBytes = remainingMessageBytes + 4; + if (totalMessageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { + throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " + + totalMessageBytes + " bytes."); + } + byte[] messageByteArray = new byte[remainingMessageBytes]; + streamInput.readFully(messageByteArray); + try (StreamInput messageStreamInput = new BytesArray(messageByteArray).streamInput()) { + this.version = Version.readVersion(messageStreamInput); + } } } From 5681a8f60807e3b2cf22bf939a98a5ba1e85640e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 7 Dec 2018 18:09:35 -0700 Subject: [PATCH 11/13] Changes --- .../transport/netty4/ByteBufStreamInput.java | 4 ++- .../elasticsearch/http/nio/ByteBufUtils.java | 4 ++- .../transport/TransportHandshaker.java | 23 ++++++++--------- .../transport/TransportHandshakerTests.java | 25 ------------------- 4 files changed, 16 insertions(+), 40 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java index 6fe631aeab7bb..4f0917fd99aaa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ByteBufStreamInput.java @@ -135,7 +135,9 @@ public byte readByte() throws IOException { try { return buffer.readByte(); } catch (IndexOutOfBoundsException ex) { - throw new EOFException(); + EOFException eofException = new EOFException(); + eofException.initCause(ex); + throw eofException; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 9574a044fdfcf..1f18049514f60 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -241,7 +241,9 @@ public byte readByte() throws IOException { try { return buffer.readByte(); } catch (IndexOutOfBoundsException ex) { - throw new EOFException(); + EOFException eofException = new EOFException(); + eofException.initCause(ex); + throw eofException; } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index f1ea494ca765a..e3cd514d30a8c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -87,8 +87,15 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV } } - void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput input) throws IOException { - HandshakeResponse response = new HandshakeResponse(new HandshakeRequest(input).version, this.version); + void handleHandshake(Version version, Set features, TcpChannel channel, long requestId, StreamInput stream) throws IOException { + // Must read the handshake request to exhaust the stream + HandshakeRequest handshakeRequest = new HandshakeRequest(stream); + final int nextByte = stream.read(); + if (nextByte != -1) { + throw new IllegalStateException("Handshake request not fully read for requestId [" + requestId + "], action [" + + TransportHandshaker.HANDSHAKE_ACTION_NAME + "], available [" + stream.available() + "]; resetting"); + } + HandshakeResponse response = new HandshakeResponse(this.version); handshakeResponseSender.sendResponse(version, features, channel, response, requestId); } @@ -156,10 +163,6 @@ public String executor() { static final class HandshakeRequest extends TransportRequest { - // Allow a maximum of 2KB for future handshake request versions. This 2KB limit excludes any bytes - // serialized by the abstract TransportRequest. - private static final int MAX_HANDSHAKE_REQUEST_BYTES = 1 << 11; - private final Version version; HandshakeRequest(Version version) { @@ -177,11 +180,6 @@ static final class HandshakeRequest extends TransportRequest { if (remainingMessageBytes == -1) { version = null; } else { - int totalMessageBytes = remainingMessageBytes + 4; - if (totalMessageBytes > MAX_HANDSHAKE_REQUEST_BYTES) { - throw new IOException("Handshake request limited to " + MAX_HANDSHAKE_REQUEST_BYTES + " bytes. Found " - + totalMessageBytes + " bytes."); - } byte[] messageByteArray = new byte[remainingMessageBytes]; streamInput.readFully(messageByteArray); try (StreamInput messageStreamInput = new BytesArray(messageByteArray).streamInput()) { @@ -211,9 +209,8 @@ public void writeTo(StreamOutput streamOutput) throws IOException { static final class HandshakeResponse extends TransportResponse { private final Version responseVersion; - private Version requestVersion; - HandshakeResponse(Version requestVersion, Version responseVersion) { + HandshakeResponse(Version responseVersion) { this.responseVersion = responseVersion; } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 2b7f8e6c61c7a..8344388f739af 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -128,31 +128,6 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException assertEquals(Version.CURRENT, response.getResponseVersion()); } - public void testHandshakeRequestToLarge() throws IOException { - long reqId = randomLongBetween(1, 10); - handshaker.sendHandshake(reqId, node, channel, new TimeValue(30, TimeUnit.SECONDS), PlainActionFuture.newFuture()); - - verify(requestSender).sendRequest(node, channel, reqId, Version.CURRENT.minimumCompatibilityVersion()); - - TcpChannel mockChannel = mock(TcpChannel.class); - TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - handshakeRequest.writeTo(bytesStreamOutput); - BytesStreamOutput taskIdStream = new BytesStreamOutput(); - TaskId.EMPTY_TASK_ID.writeTo(taskIdStream); - int taskIdBytes = taskIdStream.bytes().length(); - int currentPosition = (int) bytesStreamOutput.position(); - bytesStreamOutput.seek(taskIdBytes); - bytesStreamOutput.writeInt((currentPosition - taskIdBytes - 4) + 2048); - bytesStreamOutput.seek(currentPosition); - bytesStreamOutput.write(new byte[2048]); - StreamInput input = bytesStreamOutput.bytes().streamInput(); - assertEquals(2057, input.available()); - IOException ioException = expectThrows(IOException.class, - () -> handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input)); - assertEquals("Handshake request limited to 2048 bytes. Found 2056 bytes.", ioException.getMessage()); - } - public void testHandshakeError() throws IOException { PlainActionFuture versionFuture = PlainActionFuture.newFuture(); long reqId = randomLongBetween(1, 10); From f684c1fdb781501dc6a72643536ed556408f7904 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 11 Dec 2018 13:43:56 -0700 Subject: [PATCH 12/13] Changes from review --- .../transport/TransportHandshaker.java | 16 ++++----- .../transport/TransportHandshakerTests.java | 34 +++++++++++-------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index e3cd514d30a8c..3497b29d6d0d7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -171,18 +170,16 @@ static final class HandshakeRequest extends TransportRequest { HandshakeRequest(StreamInput streamInput) throws IOException { super(streamInput); - int remainingMessageBytes; + BytesReference remainingMessage; try { - remainingMessageBytes = streamInput.readInt(); + remainingMessage = streamInput.readBytesReference(); } catch (EOFException e) { - remainingMessageBytes = -1; + remainingMessage = null; } - if (remainingMessageBytes == -1) { + if (remainingMessage == null) { version = null; } else { - byte[] messageByteArray = new byte[remainingMessageBytes]; - streamInput.readFully(messageByteArray); - try (StreamInput messageStreamInput = new BytesArray(messageByteArray).streamInput()) { + try (StreamInput messageStreamInput = remainingMessage.streamInput()) { this.version = Version.readVersion(messageStreamInput); } } @@ -200,8 +197,7 @@ public void writeTo(StreamOutput streamOutput) throws IOException { try (BytesStreamOutput messageStreamOutput = new BytesStreamOutput(4)) { Version.writeVersion(version, messageStreamOutput); BytesReference reference = messageStreamOutput.bytes(); - streamOutput.writeInt(reference.length()); - reference.writeTo(streamOutput); + streamOutput.writeBytesReference(reference); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java index 8344388f739af..0b5e52009cecd 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportHandshakerTests.java @@ -103,20 +103,26 @@ public void testHandshakeRequestFutureVersionsCompatibility() throws IOException TcpChannel mockChannel = mock(TcpChannel.class); TransportHandshaker.HandshakeRequest handshakeRequest = new TransportHandshaker.HandshakeRequest(Version.CURRENT); - BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); - handshakeRequest.writeTo(bytesStreamOutput); - BytesStreamOutput taskIdStream = new BytesStreamOutput(); - TaskId.EMPTY_TASK_ID.writeTo(taskIdStream); - int taskIdBytes = taskIdStream.bytes().length(); - int currentPosition = (int) bytesStreamOutput.position(); - bytesStreamOutput.seek(taskIdBytes); - bytesStreamOutput.writeInt((currentPosition - taskIdBytes - 4) + 1024); - bytesStreamOutput.seek(currentPosition); - bytesStreamOutput.write(new byte[1024]); - StreamInput input = bytesStreamOutput.bytes().streamInput(); - assertEquals(1033, input.available()); - handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, input); - assertEquals(0, input.available()); + BytesStreamOutput currentHandshakeBytes = new BytesStreamOutput(); + handshakeRequest.writeTo(currentHandshakeBytes); + + BytesStreamOutput lengthCheckingHandshake = new BytesStreamOutput(); + BytesStreamOutput futureHandshake = new BytesStreamOutput(); + TaskId.EMPTY_TASK_ID.writeTo(lengthCheckingHandshake); + TaskId.EMPTY_TASK_ID.writeTo(futureHandshake); + try (BytesStreamOutput internalMessage = new BytesStreamOutput()) { + Version.writeVersion(Version.CURRENT, internalMessage); + lengthCheckingHandshake.writeBytesReference(internalMessage.bytes()); + internalMessage.write(new byte[1024]); + futureHandshake.writeBytesReference(internalMessage.bytes()); + } + StreamInput futureHandshakeStream = futureHandshake.bytes().streamInput(); + // We check that the handshake we serialize for this test equals the actual request. + // Otherwise, we need to update the test. + assertEquals(currentHandshakeBytes.bytes().length(), lengthCheckingHandshake.bytes().length()); + assertEquals(1031, futureHandshakeStream.available()); + handshaker.handleHandshake(Version.CURRENT, Collections.emptySet(), mockChannel, reqId, futureHandshakeStream); + assertEquals(0, futureHandshakeStream.available()); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(TransportResponse.class); From affe58aa095a82543db1267c0203f7cf9f892b30 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 11 Dec 2018 14:00:08 -0700 Subject: [PATCH 13/13] Fix test --- .../transport/AbstractSimpleTransportTestCase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index a0bf8e7c3f5b2..b2e468a9b2505 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2382,7 +2382,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(53, transportStats.getTxSize().getBytes()); + assertEquals(50, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2392,7 +2392,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(109, transportStats.getTxSize().getBytes()); + assertEquals(106, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2400,7 +2400,7 @@ public String executor() { assertEquals(2, stats.getRxCount()); assertEquals(2, stats.getTxCount()); assertEquals(46, stats.getRxSize().getBytes()); - assertEquals(109, stats.getTxSize().getBytes()); + assertEquals(106, stats.getTxSize().getBytes()); } finally { serviceC.close(); } @@ -2497,7 +2497,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(1, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(53, transportStats.getTxSize().getBytes()); + assertEquals(50, transportStats.getTxSize().getBytes()); }); serviceC.sendRequest(connection, "internal:action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, transportResponseHandler); @@ -2507,7 +2507,7 @@ public String executor() { assertEquals(1, transportStats.getRxCount()); assertEquals(2, transportStats.getTxCount()); assertEquals(25, transportStats.getRxSize().getBytes()); - assertEquals(109, transportStats.getTxSize().getBytes()); + assertEquals(106, transportStats.getTxSize().getBytes()); }); sendResponseLatch.countDown(); responseLatch.await(); @@ -2522,7 +2522,7 @@ public String executor() { // 49 bytes are the non-exception message bytes that have been received. It should include the initial // handshake message and the header, version, etc bytes in the exception message. assertEquals(failedMessage, 49 + streamOutput.bytes().length(), stats.getRxSize().getBytes()); - assertEquals(109, stats.getTxSize().getBytes()); + assertEquals(106, stats.getTxSize().getBytes()); } finally { serviceC.close(); }