From 5b0d98c7435ad6b3b29fbde273b032fac448e3d3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 6 Jul 2021 15:31:47 -0600 Subject: [PATCH] Ensure replica requests are marked as index_data (#75008) This is related to #73497. Currently replica requests are wrapped in a concrete replica shard request. This leads to the transport layer not properly identifying them as replica index_data requests and not compressing them properly. This commit resolves this bug. --- .../TransportReplicationAction.java | 12 ++- .../RawIndexingDataTransportRequest.java | 8 +- .../elasticsearch/transport/TcpTransport.java | 7 +- .../AbstractSimpleTransportTestCase.java | 84 ++++++++++++++++++- 4 files changed, 106 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e4f1222e9a440..0e02f579c3b88 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -61,6 +61,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RawIndexingDataTransportRequest; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -1098,7 +1099,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l } /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ - public static class ConcreteShardRequest extends TransportRequest { + public static class ConcreteShardRequest extends TransportRequest + implements RawIndexingDataTransportRequest { /** {@link AllocationId#getId()} of the shard this request is sent to **/ private final String targetAllocationID; @@ -1189,6 +1191,14 @@ public long getPrimaryTerm() { return primaryTerm; } + @Override + public boolean isRawIndexingData() { + if (request instanceof RawIndexingDataTransportRequest) { + return ((RawIndexingDataTransportRequest) request).isRawIndexingData(); + } + return false; + } + @Override public String toString() { return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm; diff --git a/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java b/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java index 8969bd6a43fad..9610f52ae8182 100644 --- a/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java @@ -10,8 +10,12 @@ /** * Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS} - * is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be - * requests/responses primarily composed of raw source data. + * is configured to {@link Compression.Enabled#INDEXING_DATA} and isRawIndexingData() returns true. This is + * intended to be requests/responses primarily composed of raw source data. */ public interface RawIndexingDataTransportRequest { + + default boolean isRawIndexingData() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 139d804bf9bd9..d46cc9e6eacf9 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -258,8 +258,13 @@ public void sendRequest(long requestId, String action, TransportRequest request, throw new NodeNotConnectedException(node, "connection already closed"); } TcpChannel channel = channel(options.type()); + // We compress if total transport compression is enabled or if indexing_data transport compression + // is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be + // compressed. boolean shouldCompress = compress == Compression.Enabled.TRUE || - (compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest); + (compress == Compression.Enabled.INDEXING_DATA + && request instanceof RawIndexingDataTransportRequest + && ((RawIndexingDataTransportRequest) request).isRawIndexingData()); outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 660dd6d94fdcf..addb87a6d807f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -74,6 +74,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -87,8 +88,10 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -651,6 +654,74 @@ public void handleException(TransportException exp) { } } + public void testIndexingDataCompression() throws Exception { + try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) { + String component = "cccccccccooooooooooooooommmmmmmmmmmppppppppppprrrrrrrreeeeeeeeeessssssssiiiiiiiiiibbbbbbbbllllllllleeeeee"; + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 30; ++i) { + builder.append(component); + } + String text = builder.toString(); + TransportRequestHandler handler = (request, channel, task) -> { + assertThat(text, equalTo(request.message)); + try { + channel.sendResponse(new StringMessageResponse("")); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } + }; + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler); + serviceC.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler); + + Settings settingsWithCompress = Settings.builder() + .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA) + .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), + randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)) + .build(); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); + serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); + serviceA.connectToNode(serviceC.getLocalDiscoNode(), connectionProfile); + + TransportResponseHandler responseHandler = new TransportResponseHandler() { + @Override + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void handleResponse(StringMessageResponse response) { + } + + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }; + + Future compressed = serviceC.submitRequest(serviceA.getLocalDiscoNode(), "internal:sayHello", + new StringMessageRequest(text, -1, true), responseHandler); + Future uncompressed = serviceA.submitRequest(serviceC.getLocalDiscoNode(), "internal:sayHello", + new StringMessageRequest(text, -1, false), responseHandler); + + compressed.get(); + uncompressed.get(); + final long bytesLength; + try (BytesStreamOutput output = new BytesStreamOutput()) { + new StringMessageRequest(text, -1).writeTo(output); + bytesLength = output.bytes().length(); + } + assertThat(serviceA.transport().getStats().getRxSize().getBytes(), lessThan(bytesLength)); + assertThat(serviceC.transport().getStats().getRxSize().getBytes(), greaterThan(bytesLength)); + } + } + public void testErrorMessage() { serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { @@ -1188,14 +1259,20 @@ public void handleException(TransportException exp) { } } - public static class StringMessageRequest extends TransportRequest { + public static class StringMessageRequest extends TransportRequest implements RawIndexingDataTransportRequest { private String message; private long timeout; + private boolean isRawIndexingData = false; StringMessageRequest(String message, long timeout) { + this(message, timeout, false); + } + + StringMessageRequest(String message, long timeout, boolean isRawIndexingData) { this.message = message; this.timeout = timeout; + this.isRawIndexingData = isRawIndexingData; } public StringMessageRequest(StreamInput in) throws IOException { @@ -1212,6 +1289,11 @@ public long timeout() { return timeout; } + @Override + public boolean isRawIndexingData() { + return isRawIndexingData; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out);