Skip to content

Commit 38b67f7

Browse files
authored
Add int indicating size of transport header (#50085)
Currently we do not know the size of the transport header (map of request response headers, features array, and action name). This means that we must read the entire transport message to dependably act on the headers. This commit adds an int indicating the size of the transport headers. With this addition we can act upon the headers prior to reading the entire message.
1 parent e9e2e5f commit 38b67f7

File tree

7 files changed

+116
-82
lines changed

7 files changed

+116
-82
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ public void setFeatures(final Set<String> features) {
146146
this.features = Collections.unmodifiableSet(new HashSet<>(features));
147147
}
148148

149+
public Set<String> getFeatures() {
150+
return this.features;
151+
}
152+
149153
public long position() throws IOException {
150154
throw new UnsupportedOperationException();
151155
}

server/src/main/java/org/elasticsearch/transport/InboundMessage.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
package org.elasticsearch.transport;
2020

2121
import org.elasticsearch.Version;
22-
import org.elasticsearch.common.Nullable;
2322
import org.elasticsearch.common.bytes.BytesReference;
24-
import org.elasticsearch.common.compress.Compressor;
2523
import org.elasticsearch.common.compress.CompressorFactory;
2624
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2725
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -62,10 +60,6 @@ static class Reader {
6260
}
6361

6462
InboundMessage deserialize(BytesReference reference) throws IOException {
65-
int messageLengthBytes = reference.length();
66-
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
67-
// we have additional bytes to read, outside of the header
68-
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
6963
StreamInput streamInput = reference.streamInput();
7064
boolean success = false;
7165
try (ThreadContext.StoredContext existing = threadContext.stashContext()) {
@@ -74,23 +68,13 @@ InboundMessage deserialize(BytesReference reference) throws IOException {
7468
Version remoteVersion = Version.fromId(streamInput.readInt());
7569
final boolean isHandshake = TransportStatus.isHandshake(status);
7670
ensureVersionCompatibility(remoteVersion, version, isHandshake);
77-
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamInput.available() > 0) {
78-
Compressor compressor = getCompressor(reference);
79-
if (compressor == null) {
80-
int maxToRead = Math.min(reference.length(), 10);
81-
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [")
82-
.append(maxToRead).append("] content bytes out of [").append(reference.length())
83-
.append("] readable bytes with message size [").append(messageLengthBytes).append("] ").append("] are [");
84-
for (int i = 0; i < maxToRead; i++) {
85-
sb.append(reference.get(i)).append(",");
86-
}
87-
sb.append("]");
88-
throw new IllegalStateException(sb.toString());
89-
}
90-
streamInput = compressor.streamInput(streamInput);
71+
72+
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
73+
// Consume the variable header size
74+
streamInput.readInt();
75+
} else {
76+
streamInput = decompressingStream(status, remoteVersion, streamInput);
9177
}
92-
streamInput = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry);
93-
streamInput.setVersion(remoteVersion);
9478

9579
threadContext.readHeaders(streamInput);
9680

@@ -108,8 +92,17 @@ InboundMessage deserialize(BytesReference reference) throws IOException {
10892
features = Collections.emptySet();
10993
}
11094
final String action = streamInput.readString();
95+
96+
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
97+
streamInput = decompressingStream(status, remoteVersion, streamInput);
98+
}
99+
streamInput = namedWriteableStream(streamInput, remoteVersion);
111100
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
112101
} else {
102+
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
103+
streamInput = decompressingStream(status, remoteVersion, streamInput);
104+
}
105+
streamInput = namedWriteableStream(streamInput, remoteVersion);
113106
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
114107
}
115108
success = true;
@@ -120,13 +113,26 @@ InboundMessage deserialize(BytesReference reference) throws IOException {
120113
}
121114
}
122115
}
123-
}
124116

125-
@Nullable
126-
static Compressor getCompressor(BytesReference message) {
127-
final int offset = TcpHeader.REQUEST_ID_SIZE + TcpHeader.STATUS_SIZE + TcpHeader.VERSION_ID_SIZE;
128-
return CompressorFactory.COMPRESSOR.isCompressed(message.slice(offset, message.length() - offset))
129-
? CompressorFactory.COMPRESSOR : null;
117+
static StreamInput decompressingStream(byte status, Version remoteVersion, StreamInput streamInput) throws IOException {
118+
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
119+
try {
120+
StreamInput decompressor = CompressorFactory.COMPRESSOR.streamInput(streamInput);
121+
decompressor.setVersion(remoteVersion);
122+
return decompressor;
123+
} catch (IllegalArgumentException e) {
124+
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
125+
}
126+
} else {
127+
return streamInput;
128+
}
129+
}
130+
131+
private StreamInput namedWriteableStream(StreamInput delegate, Version remoteVersion) {
132+
NamedWriteableAwareStreamInput streamInput = new NamedWriteableAwareStreamInput(delegate, namedWriteableRegistry);
133+
streamInput.setVersion(remoteVersion);
134+
return streamInput;
135+
}
130136
}
131137

132138
@Override

server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.io.IOException;
3131
import java.util.Set;
3232

33-
abstract class OutboundMessage extends NetworkMessage implements Writeable {
33+
abstract class OutboundMessage extends NetworkMessage {
3434

3535
private final Writeable message;
3636

@@ -42,22 +42,39 @@ abstract class OutboundMessage extends NetworkMessage implements Writeable {
4242
BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
4343
storedContext.restore();
4444
bytesStream.setVersion(version);
45-
bytesStream.skip(TcpHeader.HEADER_SIZE);
45+
bytesStream.skip(TcpHeader.headerSize(version));
4646

4747
// The compressible bytes stream will not close the underlying bytes stream
4848
BytesReference reference;
49+
int variableHeaderLength = -1;
50+
final long preHeaderPosition = bytesStream.position();
51+
52+
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
53+
writeVariableHeader(bytesStream);
54+
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
55+
}
56+
4957
try (CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
5058
stream.setVersion(version);
51-
threadContext.writeTo(stream);
52-
writeTo(stream);
59+
stream.setFeatures(bytesStream.getFeatures());
60+
61+
if (variableHeaderLength == -1) {
62+
writeVariableHeader(stream);
63+
}
5364
reference = writeMessage(stream);
5465
}
66+
5567
bytesStream.seek(0);
56-
TcpHeader.writeHeader(bytesStream, requestId, status, version, reference.length() - TcpHeader.HEADER_SIZE);
68+
final int contentSize = reference.length() - TcpHeader.headerSize(version);
69+
TcpHeader.writeHeader(bytesStream, requestId, status, version, contentSize, variableHeaderLength);
5770
return reference;
5871
}
5972

60-
private BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
73+
protected void writeVariableHeader(StreamOutput stream) throws IOException {
74+
threadContext.writeTo(stream);
75+
}
76+
77+
protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
6178
final BytesReference zeroCopyBuffer;
6279
if (message instanceof BytesTransportRequest) {
6380
BytesTransportRequest bRequest = (BytesTransportRequest) message;
@@ -96,11 +113,12 @@ static class Request extends OutboundMessage {
96113
}
97114

98115
@Override
99-
public void writeTo(StreamOutput out) throws IOException {
116+
protected void writeVariableHeader(StreamOutput stream) throws IOException {
117+
super.writeVariableHeader(stream);
100118
if (version.onOrAfter(Version.V_6_3_0)) {
101-
out.writeStringArray(features);
119+
stream.writeStringArray(features);
102120
}
103-
out.writeString(action);
121+
stream.writeString(action);
104122
}
105123

106124
private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
@@ -128,8 +146,9 @@ static class Response extends OutboundMessage {
128146
}
129147

130148
@Override
131-
public void writeTo(StreamOutput out) throws IOException {
132-
out.setFeatures(features);
149+
protected void writeVariableHeader(StreamOutput stream) throws IOException {
150+
super.writeVariableHeader(stream);
151+
stream.setFeatures(features);
133152
}
134153

135154
private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {

server/src/main/java/org/elasticsearch/transport/TcpHeader.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import java.io.IOException;
2626

2727
public class TcpHeader {
28-
public static final int MARKER_BYTES_SIZE = 2 * 1;
28+
29+
public static final Version VERSION_WITH_HEADER_SIZE = Version.V_7_6_0;
30+
31+
public static final int MARKER_BYTES_SIZE = 2;
2932

3033
public static final int MESSAGE_LENGTH_SIZE = 4;
3134

@@ -35,15 +38,36 @@ public class TcpHeader {
3538

3639
public static final int VERSION_ID_SIZE = 4;
3740

38-
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
41+
public static final int VARIABLE_HEADER_SIZE = 4;
42+
43+
private static final int PRE_76_HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
44+
45+
private static final int HEADER_SIZE = PRE_76_HEADER_SIZE + VARIABLE_HEADER_SIZE;
46+
47+
public static int headerSize(Version version) {
48+
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
49+
return HEADER_SIZE;
50+
} else {
51+
return PRE_76_HEADER_SIZE;
52+
}
53+
}
3954

40-
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int messageSize) throws IOException {
55+
public static void writeHeader(StreamOutput output, long requestId, byte status, Version version, int contentSize,
56+
int variableHeaderSize) throws IOException {
4157
output.writeByte((byte)'E');
4258
output.writeByte((byte)'S');
4359
// write the size, the size indicates the remaining message size, not including the size int
44-
output.writeInt(messageSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
60+
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
61+
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE + VARIABLE_HEADER_SIZE);
62+
} else {
63+
output.writeInt(contentSize + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE);
64+
}
4565
output.writeLong(requestId);
4666
output.writeByte(status);
4767
output.writeInt(version.id);
68+
if (version.onOrAfter(VERSION_WITH_HEADER_SIZE)) {
69+
assert variableHeaderSize != -1 : "Variable header size not set";
70+
output.writeInt(variableHeaderSize);
71+
}
4872
}
4973
}

server/src/main/java/org/elasticsearch/transport/TransportLogger.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.common.bytes.BytesReference;
25-
import org.elasticsearch.common.compress.Compressor;
26-
import org.elasticsearch.common.compress.NotCompressedException;
2725
import org.elasticsearch.common.io.stream.StreamInput;
2826
import org.elasticsearch.common.util.concurrent.ThreadContext;
2927
import org.elasticsearch.core.internal.io.IOUtils;
@@ -77,26 +75,24 @@ private static String format(TcpChannel channel, BytesReference message, String
7775
final byte status = streamInput.readByte();
7876
final boolean isRequest = TransportStatus.isRequest(status);
7977
final String type = isRequest ? "request" : "response";
80-
final String version = Version.fromId(streamInput.readInt()).toString();
78+
Version version = Version.fromId(streamInput.readInt());
8179
sb.append(" [length: ").append(messageLengthWithHeader);
8280
sb.append(", request id: ").append(requestId);
8381
sb.append(", type: ").append(type);
8482
sb.append(", version: ").append(version);
8583

84+
if (version.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
85+
sb.append(", header size: ").append(streamInput.readInt()).append('B');
86+
} else {
87+
streamInput = InboundMessage.Reader.decompressingStream(status, version, streamInput);
88+
}
89+
90+
// read and discard headers
91+
ThreadContext.readHeadersFromStream(streamInput);
92+
8693
if (isRequest) {
87-
if (TransportStatus.isCompress(status)) {
88-
Compressor compressor;
89-
compressor = InboundMessage.getCompressor(message);
90-
if (compressor == null) {
91-
throw new IllegalStateException(new NotCompressedException());
92-
}
93-
streamInput = compressor.streamInput(streamInput);
94-
}
95-
96-
// read and discard headers
97-
ThreadContext.readHeadersFromStream(streamInput);
98-
// now we decode the features
9994
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {
95+
// discard features
10096
streamInput.readStringArray();
10197
}
10298
sb.append(", action: ").append(streamInput.readString());

server/src/test/java/org/elasticsearch/transport/InboundMessageTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,14 @@ public void testThrowOnNotCompressed() throws Exception {
191191
reference = request.serialize(streamOutput);
192192
}
193193
final byte[] serialized = BytesReference.toBytes(reference);
194-
final int statusPosition = TcpHeader.HEADER_SIZE - TcpHeader.VERSION_ID_SIZE - 1;
194+
final int statusPosition = TcpHeader.headerSize(Version.CURRENT) - TcpHeader.VERSION_ID_SIZE - TcpHeader.VARIABLE_HEADER_SIZE - 1;
195195
// force status byte to signal compressed on the otherwise uncompressed message
196196
serialized[statusPosition] = TransportStatus.setCompress(serialized[statusPosition]);
197197
reference = new BytesArray(serialized);
198198
InboundMessage.Reader reader = new InboundMessage.Reader(Version.CURRENT, registry, threadContext);
199199
BytesReference sliced = reference.slice(6, reference.length() - 6);
200200
final IllegalStateException iste = expectThrows(IllegalStateException.class, () -> reader.deserialize(sliced));
201-
assertThat(iste.getMessage(), Matchers.startsWith("stream marked as compressed, but no compressor found,"));
201+
assertThat(iste.getMessage(), Matchers.equalTo("stream marked as compressed, but is missing deflate header"));
202202
}
203203

204204
private void testVersionIncompatibility(Version version, Version currentVersion, boolean isHandshake) throws IOException {

server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
2525
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest;
2626
import org.elasticsearch.common.bytes.BytesReference;
27-
import org.elasticsearch.common.bytes.CompositeBytesReference;
2827
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2928
import org.elasticsearch.common.logging.Loggers;
3029
import org.elasticsearch.common.settings.Settings;
@@ -61,6 +60,7 @@ public void testLoggingHandler() throws IOException {
6160
", request id: \\d+" +
6261
", type: request" +
6362
", version: .*" +
63+
", header size: \\d+B" +
6464
", action: cluster:monitor/stats]" +
6565
" WRITE: \\d+B";
6666
final MockLogAppender.LoggingExpectation writeExpectation =
@@ -72,6 +72,7 @@ public void testLoggingHandler() throws IOException {
7272
", request id: \\d+" +
7373
", type: request" +
7474
", version: .*" +
75+
", header size: \\d+B" +
7576
", action: cluster:monitor/stats]" +
7677
" READ: \\d+B";
7778

@@ -88,27 +89,11 @@ public void testLoggingHandler() throws IOException {
8889
}
8990

9091
private BytesReference buildRequest() throws IOException {
91-
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
92-
messageOutput.setVersion(Version.CURRENT);
93-
ThreadContext context = new ThreadContext(Settings.EMPTY);
94-
context.writeTo(messageOutput);
95-
messageOutput.writeStringArray(new String[0]);
96-
messageOutput.writeString(ClusterStatsAction.NAME);
97-
new ClusterStatsRequest().writeTo(messageOutput);
98-
BytesReference messageBody = messageOutput.bytes();
99-
final BytesReference header = buildHeader(randomInt(30), messageBody.length());
100-
return new CompositeBytesReference(header, messageBody);
101-
}
102-
}
103-
104-
private BytesReference buildHeader(long requestId, int length) throws IOException {
105-
try (BytesStreamOutput headerOutput = new BytesStreamOutput(TcpHeader.HEADER_SIZE)) {
106-
headerOutput.setVersion(Version.CURRENT);
107-
TcpHeader.writeHeader(headerOutput, requestId, TransportStatus.setRequest((byte) 0), Version.CURRENT, length);
108-
final BytesReference bytes = headerOutput.bytes();
109-
assert bytes.length() == TcpHeader.HEADER_SIZE : "header size mismatch expected: " + TcpHeader.HEADER_SIZE + " but was: "
110-
+ bytes.length();
111-
return bytes;
92+
boolean compress = randomBoolean();
93+
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
94+
OutboundMessage.Request request = new OutboundMessage.Request(new ThreadContext(Settings.EMPTY), new String[0],
95+
new ClusterStatsRequest(), Version.CURRENT, ClusterStatsAction.NAME, randomInt(30), false, compress);
96+
return request.serialize(bytesStreamOutput);
11297
}
11398
}
11499
}

0 commit comments

Comments
 (0)