Skip to content

Commit d3eb540

Browse files
authored
Ensure replica requests are marked as index_data (#75008)
This is related to #73497. Currently replica requests are wrapped in a concrete replica shard request. This leads to the transport layer not properly identifying them as replica index_data requests and not compressing them properly. This commit resolves this bug.
1 parent d56ebc7 commit d3eb540

File tree

4 files changed

+104
-8
lines changed

4 files changed

+104
-8
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.elasticsearch.tasks.TaskId;
6161
import org.elasticsearch.threadpool.ThreadPool;
6262
import org.elasticsearch.transport.ConnectTransportException;
63+
import org.elasticsearch.transport.RawIndexingDataTransportRequest;
6364
import org.elasticsearch.transport.TransportChannel;
6465
import org.elasticsearch.transport.TransportException;
6566
import org.elasticsearch.transport.TransportRequest;
@@ -1097,7 +1098,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l
10971098
}
10981099

10991100
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
1100-
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {
1101+
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
1102+
implements RawIndexingDataTransportRequest {
11011103

11021104
/** {@link AllocationId#getId()} of the shard this request is sent to **/
11031105
private final String targetAllocationID;
@@ -1188,6 +1190,14 @@ public long getPrimaryTerm() {
11881190
return primaryTerm;
11891191
}
11901192

1193+
@Override
1194+
public boolean isRawIndexingData() {
1195+
if (request instanceof RawIndexingDataTransportRequest) {
1196+
return ((RawIndexingDataTransportRequest) request).isRawIndexingData();
1197+
}
1198+
return false;
1199+
}
1200+
11911201
@Override
11921202
public String toString() {
11931203
return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm;

server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010

1111
/**
1212
* Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS}
13-
* is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be
14-
* requests/responses primarily composed of raw source data.
13+
* is configured to {@link Compression.Enabled#INDEXING_DATA} and isRawIndexingData() returns true. This is
14+
* intended to be requests/responses primarily composed of raw source data.
1515
*/
1616
public interface RawIndexingDataTransportRequest {
17+
18+
default boolean isRawIndexingData() {
19+
return true;
20+
}
1721
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,13 @@ public void sendRequest(long requestId, String action, TransportRequest request,
243243
throw new NodeNotConnectedException(node, "connection already closed");
244244
}
245245
TcpChannel channel = channel(options.type());
246+
// We compress if total transport compression is enabled or if indexing_data transport compression
247+
// is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be
248+
// compressed.
246249
boolean shouldCompress = compress == Compression.Enabled.TRUE ||
247-
(compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest);
250+
(compress == Compression.Enabled.INDEXING_DATA
251+
&& request instanceof RawIndexingDataTransportRequest
252+
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());
248253
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false);
249254
}
250255

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.elasticsearch.action.StepListener;
2323
import org.elasticsearch.action.support.PlainActionFuture;
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
25-
import org.elasticsearch.core.Nullable;
26-
import org.elasticsearch.core.SuppressForbidden;
2725
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2826
import org.elasticsearch.common.io.stream.StreamInput;
2927
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -36,9 +34,11 @@
3634
import org.elasticsearch.common.settings.Settings;
3735
import org.elasticsearch.common.transport.BoundTransportAddress;
3836
import org.elasticsearch.common.transport.TransportAddress;
39-
import org.elasticsearch.core.TimeValue;
4037
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4138
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
39+
import org.elasticsearch.core.Nullable;
40+
import org.elasticsearch.core.SuppressForbidden;
41+
import org.elasticsearch.core.TimeValue;
4242
import org.elasticsearch.core.internal.io.IOUtils;
4343
import org.elasticsearch.mocksocket.MockServerSocket;
4444
import org.elasticsearch.node.Node;
@@ -89,8 +89,10 @@
8989
import static org.hamcrest.Matchers.containsString;
9090
import static org.hamcrest.Matchers.empty;
9191
import static org.hamcrest.Matchers.equalTo;
92+
import static org.hamcrest.Matchers.greaterThan;
9293
import static org.hamcrest.Matchers.hasToString;
9394
import static org.hamcrest.Matchers.instanceOf;
95+
import static org.hamcrest.Matchers.lessThan;
9496
import static org.hamcrest.Matchers.not;
9597
import static org.hamcrest.Matchers.notNullValue;
9698
import static org.hamcrest.Matchers.nullValue;
@@ -639,6 +641,70 @@ public void handleException(TransportException exp) {
639641
}
640642
}
641643

644+
public void testIndexingDataCompression() throws Exception {
645+
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
646+
String component = "cccccccccooooooooooooooommmmmmmmmmmppppppppppprrrrrrrreeeeeeeeeessssssssiiiiiiiiiibbbbbbbbllllllllleeeeee";
647+
String text = component.repeat(30);
648+
TransportRequestHandler<StringMessageRequest> handler = (request, channel, task) -> {
649+
assertThat(text, equalTo(request.message));
650+
try {
651+
channel.sendResponse(new StringMessageResponse(""));
652+
} catch (IOException e) {
653+
logger.error("Unexpected failure", e);
654+
fail(e.getMessage());
655+
}
656+
};
657+
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);
658+
serviceC.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);
659+
660+
Settings settingsWithCompress = Settings.builder()
661+
.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA)
662+
.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(),
663+
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))
664+
.build();
665+
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
666+
connectToNode(serviceC, serviceA.getLocalDiscoNode(), connectionProfile);
667+
connectToNode(serviceA, serviceC.getLocalDiscoNode(), connectionProfile);
668+
669+
TransportResponseHandler<StringMessageResponse> responseHandler = new TransportResponseHandler<>() {
670+
@Override
671+
public StringMessageResponse read(StreamInput in) throws IOException {
672+
return new StringMessageResponse(in);
673+
}
674+
675+
@Override
676+
public String executor() {
677+
return ThreadPool.Names.GENERIC;
678+
}
679+
680+
@Override
681+
public void handleResponse(StringMessageResponse response) {
682+
}
683+
684+
@Override
685+
public void handleException(TransportException exp) {
686+
logger.error("Unexpected failure", exp);
687+
fail("got exception instead of a response: " + exp.getMessage());
688+
}
689+
};
690+
691+
Future<StringMessageResponse> compressed = submitRequest(serviceC, serviceA.getLocalDiscoNode(), "internal:sayHello",
692+
new StringMessageRequest(text, -1, true), responseHandler);
693+
Future<StringMessageResponse> uncompressed = submitRequest(serviceA, serviceC.getLocalDiscoNode(), "internal:sayHello",
694+
new StringMessageRequest(text, -1, false), responseHandler);
695+
696+
compressed.get();
697+
uncompressed.get();
698+
final long bytesLength;
699+
try (BytesStreamOutput output = new BytesStreamOutput()) {
700+
new StringMessageRequest(text, -1).writeTo(output);
701+
bytesLength = output.bytes().length();
702+
}
703+
assertThat(serviceA.transport().getStats().getRxSize().getBytes(), lessThan(bytesLength));
704+
assertThat(serviceC.transport().getStats().getRxSize().getBytes(), greaterThan(bytesLength));
705+
}
706+
}
707+
642708
public void testErrorMessage() throws InterruptedException {
643709
serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new,
644710
(request, channel, task) -> {
@@ -1150,14 +1216,20 @@ public void handleException(TransportException exp) {
11501216
}
11511217
}
11521218

1153-
public static class StringMessageRequest extends TransportRequest {
1219+
public static class StringMessageRequest extends TransportRequest implements RawIndexingDataTransportRequest {
11541220

11551221
private String message;
11561222
private long timeout;
1223+
private boolean isRawIndexingData = false;
11571224

11581225
StringMessageRequest(String message, long timeout) {
1226+
this(message, timeout, false);
1227+
}
1228+
1229+
StringMessageRequest(String message, long timeout, boolean isRawIndexingData) {
11591230
this.message = message;
11601231
this.timeout = timeout;
1232+
this.isRawIndexingData = isRawIndexingData;
11611233
}
11621234

11631235
public StringMessageRequest(StreamInput in) throws IOException {
@@ -1174,6 +1246,11 @@ public long timeout() {
11741246
return timeout;
11751247
}
11761248

1249+
@Override
1250+
public boolean isRawIndexingData() {
1251+
return isRawIndexingData;
1252+
}
1253+
11771254
@Override
11781255
public void writeTo(StreamOutput out) throws IOException {
11791256
super.writeTo(out);

0 commit comments

Comments
 (0)