Skip to content

Commit 4247bef

Browse files
committed
Ensure replica requests are marked as index_data (#75014)
This is related to #73497. Currently replica requests are wrapped in a concrete replica shard request. This leads to the transport layer not properly identifying them as replica index_data requests and not compressing them properly. This commit resolves this bug.
1 parent 851c601 commit 4247bef

File tree

4 files changed

+106
-5
lines changed

4 files changed

+106
-5
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.tasks.TaskId;
6262
import org.elasticsearch.threadpool.ThreadPool;
6363
import org.elasticsearch.transport.ConnectTransportException;
64+
import org.elasticsearch.transport.RawIndexingDataTransportRequest;
6465
import org.elasticsearch.transport.TransportChannel;
6566
import org.elasticsearch.transport.TransportException;
6667
import org.elasticsearch.transport.TransportRequest;
@@ -1098,7 +1099,8 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l
10981099
}
10991100

11001101
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
1101-
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {
1102+
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
1103+
implements RawIndexingDataTransportRequest {
11021104

11031105
/** {@link AllocationId#getId()} of the shard this request is sent to **/
11041106
private final String targetAllocationID;
@@ -1189,6 +1191,14 @@ public long getPrimaryTerm() {
11891191
return primaryTerm;
11901192
}
11911193

1194+
@Override
1195+
public boolean isRawIndexingData() {
1196+
if (request instanceof RawIndexingDataTransportRequest) {
1197+
return ((RawIndexingDataTransportRequest) request).isRawIndexingData();
1198+
}
1199+
return false;
1200+
}
1201+
11921202
@Override
11931203
public String toString() {
11941204
return "request: " + request + ", target allocation id: " + targetAllocationID + ", primary term: " + primaryTerm;

server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010

1111
/**
1212
* Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS}
13-
* is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be
14-
* requests/responses primarily composed of raw source data.
13+
* is configured to {@link Compression.Enabled#INDEXING_DATA} and isRawIndexingData() returns true. This is
14+
* intended to be requests/responses primarily composed of raw source data.
1515
*/
1616
public interface RawIndexingDataTransportRequest {
17+
18+
default boolean isRawIndexingData() {
19+
return true;
20+
}
1721
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,13 @@ public void sendRequest(long requestId, String action, TransportRequest request,
258258
throw new NodeNotConnectedException(node, "connection already closed");
259259
}
260260
TcpChannel channel = channel(options.type());
261+
// We compress if total transport compression is enabled or if indexing_data transport compression
262+
// is enabled and the request is a RawIndexingDataTransportRequest which indicates it should be
263+
// compressed.
261264
boolean shouldCompress = compress == Compression.Enabled.TRUE ||
262-
(compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest);
265+
(compress == Compression.Enabled.INDEXING_DATA
266+
&& request instanceof RawIndexingDataTransportRequest
267+
&& ((RawIndexingDataTransportRequest) request).isRawIndexingData());
263268
outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false);
264269
}
265270

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.util.concurrent.CountDownLatch;
7575
import java.util.concurrent.CyclicBarrier;
7676
import java.util.concurrent.ExecutionException;
77+
import java.util.concurrent.Future;
7778
import java.util.concurrent.Semaphore;
7879
import java.util.concurrent.TimeUnit;
7980
import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,8 +88,10 @@
8788
import static org.hamcrest.Matchers.containsString;
8889
import static org.hamcrest.Matchers.empty;
8990
import static org.hamcrest.Matchers.equalTo;
91+
import static org.hamcrest.Matchers.greaterThan;
9092
import static org.hamcrest.Matchers.hasToString;
9193
import static org.hamcrest.Matchers.instanceOf;
94+
import static org.hamcrest.Matchers.lessThan;
9295
import static org.hamcrest.Matchers.not;
9396
import static org.hamcrest.Matchers.notNullValue;
9497
import static org.hamcrest.Matchers.nullValue;
@@ -651,6 +654,74 @@ public void handleException(TransportException exp) {
651654
}
652655
}
653656

657+
public void testIndexingDataCompression() throws Exception {
658+
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
659+
String component = "cccccccccooooooooooooooommmmmmmmmmmppppppppppprrrrrrrreeeeeeeeeessssssssiiiiiiiiiibbbbbbbbllllllllleeeeee";
660+
StringBuilder builder = new StringBuilder();
661+
for (int i = 0; i < 30; ++i) {
662+
builder.append(component);
663+
}
664+
String text = builder.toString();
665+
TransportRequestHandler<StringMessageRequest> handler = (request, channel, task) -> {
666+
assertThat(text, equalTo(request.message));
667+
try {
668+
channel.sendResponse(new StringMessageResponse(""));
669+
} catch (IOException e) {
670+
logger.error("Unexpected failure", e);
671+
fail(e.getMessage());
672+
}
673+
};
674+
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);
675+
serviceC.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, handler);
676+
677+
Settings settingsWithCompress = Settings.builder()
678+
.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA)
679+
.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(),
680+
randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))
681+
.build();
682+
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
683+
serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
684+
serviceA.connectToNode(serviceC.getLocalDiscoNode(), connectionProfile);
685+
686+
TransportResponseHandler<StringMessageResponse> responseHandler = new TransportResponseHandler<StringMessageResponse>() {
687+
@Override
688+
public StringMessageResponse read(StreamInput in) throws IOException {
689+
return new StringMessageResponse(in);
690+
}
691+
692+
@Override
693+
public String executor() {
694+
return ThreadPool.Names.GENERIC;
695+
}
696+
697+
@Override
698+
public void handleResponse(StringMessageResponse response) {
699+
}
700+
701+
@Override
702+
public void handleException(TransportException exp) {
703+
logger.error("Unexpected failure", exp);
704+
fail("got exception instead of a response: " + exp.getMessage());
705+
}
706+
};
707+
708+
Future<StringMessageResponse> compressed = serviceC.submitRequest(serviceA.getLocalDiscoNode(), "internal:sayHello",
709+
new StringMessageRequest(text, -1, true), responseHandler);
710+
Future<StringMessageResponse> uncompressed = serviceA.submitRequest(serviceC.getLocalDiscoNode(), "internal:sayHello",
711+
new StringMessageRequest(text, -1, false), responseHandler);
712+
713+
compressed.get();
714+
uncompressed.get();
715+
final long bytesLength;
716+
try (BytesStreamOutput output = new BytesStreamOutput()) {
717+
new StringMessageRequest(text, -1).writeTo(output);
718+
bytesLength = output.bytes().length();
719+
}
720+
assertThat(serviceA.transport().getStats().getRxSize().getBytes(), lessThan(bytesLength));
721+
assertThat(serviceC.transport().getStats().getRxSize().getBytes(), greaterThan(bytesLength));
722+
}
723+
}
724+
654725
public void testErrorMessage() {
655726
serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new,
656727
(request, channel, task) -> {
@@ -1188,14 +1259,20 @@ public void handleException(TransportException exp) {
11881259
}
11891260
}
11901261

1191-
public static class StringMessageRequest extends TransportRequest {
1262+
public static class StringMessageRequest extends TransportRequest implements RawIndexingDataTransportRequest {
11921263

11931264
private String message;
11941265
private long timeout;
1266+
private boolean isRawIndexingData = false;
11951267

11961268
StringMessageRequest(String message, long timeout) {
1269+
this(message, timeout, false);
1270+
}
1271+
1272+
StringMessageRequest(String message, long timeout, boolean isRawIndexingData) {
11971273
this.message = message;
11981274
this.timeout = timeout;
1275+
this.isRawIndexingData = isRawIndexingData;
11991276
}
12001277

12011278
public StringMessageRequest(StreamInput in) throws IOException {
@@ -1212,6 +1289,11 @@ public long timeout() {
12121289
return timeout;
12131290
}
12141291

1292+
@Override
1293+
public boolean isRawIndexingData() {
1294+
return isRawIndexingData;
1295+
}
1296+
12151297
@Override
12161298
public void writeTo(StreamOutput out) throws IOException {
12171299
super.writeTo(out);

0 commit comments

Comments
 (0)