Skip to content

Commit 31251c9

Browse files
authored
Make http pipelining support mandatory (#30695)
This is related to #29500 and #28898. This commit removes the abilitiy to disable http pipelining. After this commit, any elasticsearch node will support pipelined requests from a client. Additionally, it extracts some of the http pipelining work to the server module. This extracted work is used to implement pipelining for the nio plugin.
1 parent 37f67d9 commit 31251c9

33 files changed

+990
-647
lines changed

docs/reference/migration/migrate_7_0/settings.asciidoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@
2929
[[remove-http-enabled]]
3030
==== Http enabled setting removed
3131

32-
The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
32+
* The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
3333
use of the transport client. This setting has been removed, as the transport client
3434
will be removed in the future, thus requiring HTTP to always be enabled.
35+
36+
[[remove-http-pipelining-setting]]
37+
==== Http pipelining setting removed
38+
39+
* The setting `http.pipelining` previously allowed disabling HTTP pipelining support.
40+
This setting has been removed, as disabling http pipelining support on the server
41+
provided little value. The setting `http.pipelining.max_events` can still be used to
42+
limit the number of pipelined requests in-flight.

docs/reference/modules/http.asciidoc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ and stack traces in response output. Note: When set to `false` and the `error_tr
9696
parameter is specified, an error will be returned; when `error_trace` is not specified, a
9797
simple message will be returned. Defaults to `true`
9898

99-
|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.
100-
10199
|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.
102100

103101
|`http.max_warning_header_count` |The maximum number of warning headers in

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.elasticsearch.common.util.concurrent.ThreadContext;
4343
import org.elasticsearch.http.HttpHandlingSettings;
4444
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
45-
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
4645
import org.elasticsearch.rest.AbstractRestChannel;
4746
import org.elasticsearch.rest.RestResponse;
4847
import org.elasticsearch.rest.RestStatus;
@@ -59,29 +58,24 @@ final class Netty4HttpChannel extends AbstractRestChannel {
5958
private final Netty4HttpServerTransport transport;
6059
private final Channel channel;
6160
private final FullHttpRequest nettyRequest;
62-
private final HttpPipelinedRequest pipelinedRequest;
61+
private final int sequence;
6362
private final ThreadContext threadContext;
6463
private final HttpHandlingSettings handlingSettings;
6564

6665
/**
67-
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
68-
* @param request The request that is handled by this channel.
69-
* @param pipelinedRequest If HTTP pipelining is enabled provide the corresponding pipelined request. May be null if
70-
* HTTP pipelining is disabled.
71-
* @param handlingSettings true iff error messages should include stack traces.
72-
* @param threadContext the thread context for the channel
66+
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
67+
* @param request The request that is handled by this channel.
68+
* @param sequence The pipelining sequence number for this request
69+
* @param handlingSettings true if error messages should include stack traces.
70+
* @param threadContext the thread context for the channel
7371
*/
74-
Netty4HttpChannel(
75-
final Netty4HttpServerTransport transport,
76-
final Netty4HttpRequest request,
77-
final HttpPipelinedRequest pipelinedRequest,
78-
final HttpHandlingSettings handlingSettings,
79-
final ThreadContext threadContext) {
72+
Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
73+
ThreadContext threadContext) {
8074
super(request, handlingSettings.getDetailedErrorsEnabled());
8175
this.transport = transport;
8276
this.channel = request.getChannel();
8377
this.nettyRequest = request.request();
84-
this.pipelinedRequest = pipelinedRequest;
78+
this.sequence = sequence;
8579
this.threadContext = threadContext;
8680
this.handlingSettings = handlingSettings;
8781
}
@@ -129,7 +123,7 @@ public void sendResponse(RestResponse response) {
129123
final ChannelPromise promise = channel.newPromise();
130124

131125
if (releaseContent) {
132-
promise.addListener(f -> ((Releasable)content).close());
126+
promise.addListener(f -> ((Releasable) content).close());
133127
}
134128

135129
if (releaseBytesStreamOutput) {
@@ -140,13 +134,9 @@ public void sendResponse(RestResponse response) {
140134
promise.addListener(ChannelFutureListener.CLOSE);
141135
}
142136

143-
final Object msg;
144-
if (pipelinedRequest != null) {
145-
msg = pipelinedRequest.createHttpResponse(resp, promise);
146-
} else {
147-
msg = resp;
148-
}
149-
channel.writeAndFlush(msg, promise);
137+
Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);
138+
139+
channel.writeAndFlush(newResponse, promise);
150140
releaseContent = false;
151141
releaseBytesStreamOutput = false;
152142
} finally {
@@ -156,9 +146,6 @@ public void sendResponse(RestResponse response) {
156146
if (releaseBytesStreamOutput) {
157147
bytesOutputOrNull().close();
158148
}
159-
if (pipelinedRequest != null) {
160-
pipelinedRequest.release();
161-
}
162149
}
163150
}
164151

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.http.netty4;
21+
22+
import io.netty.channel.ChannelDuplexHandler;
23+
import io.netty.channel.ChannelHandlerContext;
24+
import io.netty.channel.ChannelPromise;
25+
import io.netty.handler.codec.http.LastHttpContent;
26+
import org.apache.logging.log4j.Logger;
27+
import org.elasticsearch.common.collect.Tuple;
28+
import org.elasticsearch.http.HttpPipelinedRequest;
29+
import org.elasticsearch.http.HttpPipeliningAggregator;
30+
import org.elasticsearch.transport.netty4.Netty4Utils;
31+
32+
import java.nio.channels.ClosedChannelException;
33+
import java.util.Collections;
34+
import java.util.List;
35+
36+
/**
37+
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
38+
*/
39+
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
40+
41+
private final Logger logger;
42+
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;
43+
44+
/**
45+
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
46+
*
47+
* @param logger for logging unexpected errors
48+
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
49+
* required as events cannot queue up indefinitely
50+
*/
51+
public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
52+
this.logger = logger;
53+
this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld);
54+
}
55+
56+
@Override
57+
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
58+
if (msg instanceof LastHttpContent) {
59+
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg).retain());
60+
ctx.fireChannelRead(pipelinedRequest);
61+
} else {
62+
ctx.fireChannelRead(msg);
63+
}
64+
}
65+
66+
@Override
67+
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
68+
assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class;
69+
Netty4HttpResponse response = (Netty4HttpResponse) msg;
70+
boolean success = false;
71+
try {
72+
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
73+
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
74+
ctx.write(readyResponse.v1().getResponse(), readyResponse.v2());
75+
}
76+
success = true;
77+
} catch (IllegalStateException e) {
78+
ctx.channel().close();
79+
} finally {
80+
if (success == false) {
81+
promise.setFailure(new ClosedChannelException());
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
88+
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
89+
90+
if (inflightResponses.isEmpty() == false) {
91+
ClosedChannelException closedChannelException = new ClosedChannelException();
92+
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
93+
try {
94+
inflightResponse.v2().setFailure(closedChannelException);
95+
} catch (RuntimeException e) {
96+
logger.error("unexpected error while releasing pipelined http responses", e);
97+
}
98+
}
99+
}
100+
ctx.close(promise);
101+
}
102+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,41 +30,30 @@
3030
import io.netty.handler.codec.http.HttpHeaders;
3131
import org.elasticsearch.common.util.concurrent.ThreadContext;
3232
import org.elasticsearch.http.HttpHandlingSettings;
33-
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
33+
import org.elasticsearch.http.HttpPipelinedRequest;
3434
import org.elasticsearch.rest.RestRequest;
3535
import org.elasticsearch.transport.netty4.Netty4Utils;
3636

3737
import java.util.Collections;
3838

3939
@ChannelHandler.Sharable
40-
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
40+
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
4141

4242
private final Netty4HttpServerTransport serverTransport;
4343
private final HttpHandlingSettings handlingSettings;
44-
private final boolean httpPipeliningEnabled;
4544
private final ThreadContext threadContext;
4645

4746
Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings,
4847
ThreadContext threadContext) {
4948
this.serverTransport = serverTransport;
50-
this.httpPipeliningEnabled = serverTransport.pipelining;
5149
this.handlingSettings = handlingSettings;
5250
this.threadContext = threadContext;
5351
}
5452

5553
@Override
56-
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
57-
final FullHttpRequest request;
58-
final HttpPipelinedRequest pipelinedRequest;
59-
if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) {
60-
pipelinedRequest = (HttpPipelinedRequest) msg;
61-
request = (FullHttpRequest) pipelinedRequest.last();
62-
} else {
63-
pipelinedRequest = null;
64-
request = (FullHttpRequest) msg;
65-
}
54+
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
55+
final FullHttpRequest request = msg.getRequest();
6656

67-
boolean success = false;
6857
try {
6958

7059
final FullHttpRequest copy =
@@ -111,7 +100,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
111100
Netty4HttpChannel innerChannel;
112101
try {
113102
innerChannel =
114-
new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, handlingSettings, threadContext);
103+
new Netty4HttpChannel(serverTransport, httpRequest, msg.getSequence(), handlingSettings, threadContext);
115104
} catch (final IllegalArgumentException e) {
116105
if (badRequestCause == null) {
117106
badRequestCause = e;
@@ -126,7 +115,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
126115
copy,
127116
ctx.channel());
128117
innerChannel =
129-
new Netty4HttpChannel(serverTransport, innerRequest, pipelinedRequest, handlingSettings, threadContext);
118+
new Netty4HttpChannel(serverTransport, innerRequest, msg.getSequence(), handlingSettings, threadContext);
130119
}
131120
channel = innerChannel;
132121
}
@@ -138,12 +127,9 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
138127
} else {
139128
serverTransport.dispatchRequest(httpRequest, channel);
140129
}
141-
success = true;
142130
} finally {
143-
// the request is otherwise released in case of dispatch
144-
if (success == false && pipelinedRequest != null) {
145-
pipelinedRequest.release();
146-
}
131+
// As we have copied the buffer, we can release the request
132+
request.release();
147133
}
148134
}
149135

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.http.netty4;
21+
22+
import io.netty.handler.codec.http.FullHttpResponse;
23+
import org.elasticsearch.http.HttpPipelinedMessage;
24+
25+
public class Netty4HttpResponse extends HttpPipelinedMessage {
26+
27+
private final FullHttpResponse response;
28+
29+
public Netty4HttpResponse(int sequence, FullHttpResponse response) {
30+
super(sequence);
31+
this.response = response;
32+
}
33+
34+
public FullHttpResponse getResponse() {
35+
return response;
36+
}
37+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
6363
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
6464
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
65-
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
6665
import org.elasticsearch.rest.RestUtils;
6766
import org.elasticsearch.threadpool.ThreadPool;
6867
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
@@ -99,7 +98,6 @@
9998
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
10099
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
101100
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
102-
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
103101
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
104102
import static org.elasticsearch.http.netty4.cors.Netty4CorsHandler.ANY_ORIGIN;
105103

@@ -162,8 +160,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
162160

163161
protected final int workerCount;
164162

165-
protected final boolean pipelining;
166-
167163
protected final int pipeliningMaxEvents;
168164

169165
/**
@@ -204,14 +200,16 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
204200
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
205201
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
206202
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
203+
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
207204
this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()),
208205
Math.toIntExact(maxChunkSize.getBytes()),
209206
Math.toIntExact(maxHeaderSize.getBytes()),
210207
Math.toIntExact(maxInitialLineLength.getBytes()),
211208
SETTING_HTTP_RESET_COOKIES.get(settings),
212209
SETTING_HTTP_COMPRESSION.get(settings),
213210
SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
214-
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings));
211+
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
212+
pipeliningMaxEvents);
215213

216214
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
217215
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
@@ -226,14 +224,12 @@ public Netty4HttpServerTransport(Settings settings, NetworkService networkServic
226224
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
227225
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
228226

229-
this.pipelining = SETTING_PIPELINING.get(settings);
230-
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
231227
this.corsConfig = buildCorsConfig(settings);
232228

233229
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
234-
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining[{}], pipelining_max_events[{}]",
235-
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictor, maxCompositeBufferComponents,
236-
pipelining, pipeliningMaxEvents);
230+
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",
231+
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,
232+
pipeliningMaxEvents);
237233
}
238234

239235
public Settings settings() {
@@ -452,9 +448,7 @@ protected void initChannel(Channel ch) throws Exception {
452448
if (SETTING_CORS_ENABLED.get(transport.settings())) {
453449
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
454450
}
455-
if (transport.pipelining) {
456-
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
457-
}
451+
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
458452
ch.pipeline().addLast("handler", requestHandler);
459453
}
460454

0 commit comments

Comments
 (0)