-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Use chunked REST serialization for large REST responses #88311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
63dc225
d542b74
b323fd9
ba7cdea
fa63c8c
f9a3215
1ad294d
f022190
a87cfb3
75b036d
299cdba
333da0d
d9b1f8a
84dd503
6077797
a95590e
07bfaab
21cdd21
00ee6a0
6c3b963
9f86863
5b9219a
582dfe8
5f50d87
8a28b04
d327f2d
649481e
144c227
c061b73
a087a5b
7f85c19
53c5047
7358a38
2e9e5d5
c313eb6
9b7281b
d67bb7f
6756bd5
d33bc0b
e7d7893
eb7bd9b
c75f73b
1501636
fff84b6
13b015d
d28a31b
2214a31
b4091d7
00f97e5
fbb56fd
260b474
fa93925
62bd60f
007aa02
fc3897a
0368b58
b319e1b
aee391f
19724aa
19306e2
99cea44
5479a8e
605a867
f13702b
9d0379c
79c0fe2
c29de50
2ee3615
3ec4782
82086c0
34a1657
a7e2e5d
35ad972
e3073b5
a3d7b97
e3a979e
9b06800
ee09bb9
f422312
84940fc
16c9856
1532b41
e6ddf04
857be16
1a4b364
1a76fed
7e6cd61
71dc538
f874c72
cc312d7
1154131
6516482
dcf73e4
911e91d
98ff648
2943f1b
ca30f8c
9743942
99a5aa1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 88311 | ||
| summary: Use chunked REST serialization for large REST responses | ||
| area: Network | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
| * in compliance with, at your election, the Elastic License 2.0 or the Server | ||
| * Side Public License, v 1. | ||
| */ | ||
|
|
||
| package org.elasticsearch.http.netty4; | ||
|
|
||
| import io.netty.handler.codec.http.DefaultHttpResponse; | ||
| import io.netty.handler.codec.http.HttpResponseStatus; | ||
| import io.netty.handler.codec.http.HttpVersion; | ||
|
|
||
| import org.elasticsearch.rest.ChunkedRestResponseBody; | ||
| import org.elasticsearch.rest.RestStatus; | ||
|
|
||
| /** | ||
| * A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}. | ||
| */ | ||
| public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4RestResponse { | ||
|
|
||
| private final int sequence; | ||
|
|
||
| private final ChunkedRestResponseBody body; | ||
|
|
||
| Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) { | ||
| super(version, HttpResponseStatus.valueOf(status.getStatus())); | ||
| this.sequence = sequence; | ||
| this.body = body; | ||
| } | ||
|
|
||
| public ChunkedRestResponseBody body() { | ||
| return body; | ||
| } | ||
|
|
||
| @Override | ||
| public int getSequence() { | ||
| return sequence; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.channel.Channel; | ||
| import io.netty.channel.ChannelDuplexHandler; | ||
| import io.netty.channel.ChannelFuture; | ||
| import io.netty.channel.ChannelHandlerContext; | ||
| import io.netty.channel.ChannelPromise; | ||
| import io.netty.handler.codec.compression.JdkZlibEncoder; | ||
|
|
@@ -26,11 +27,17 @@ | |
|
|
||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.core.Booleans; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.Tuple; | ||
| import org.elasticsearch.rest.ChunkedRestResponseBody; | ||
| import org.elasticsearch.transport.Transports; | ||
| import org.elasticsearch.transport.netty4.Netty4Utils; | ||
| import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler; | ||
| import org.elasticsearch.transport.netty4.NettyAllocator; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.channels.ClosedChannelException; | ||
| import java.util.ArrayDeque; | ||
| import java.util.ArrayList; | ||
|
|
@@ -41,13 +48,22 @@ | |
|
|
||
| /** | ||
| * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests. | ||
| * This handler also throttles write operations and will not pass any writes to the next handler so long as the channel is not writable. | ||
| */ | ||
| public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { | ||
|
|
||
| private final Logger logger; | ||
|
|
||
| private final int maxEventsHeld; | ||
| private final PriorityQueue<Tuple<Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue; | ||
| private final PriorityQueue<Tuple<? extends Netty4RestResponse, ChannelPromise>> outboundHoldingQueue; | ||
|
|
||
| private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Netty4ChunkedHttpResponse response) {} | ||
|
|
||
| /** | ||
| * The current {@link ChunkedWrite} if a chunked write is executed at the moment. | ||
| */ | ||
| @Nullable | ||
| private ChunkedWrite currentChunkedWrite; | ||
|
|
||
| /* | ||
| * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the | ||
|
|
@@ -119,36 +135,32 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque | |
| } | ||
|
|
||
| @Override | ||
| public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { | ||
| assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass(); | ||
| public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws IOException { | ||
| assert msg instanceof Netty4RestResponse : "Invalid message type: " + msg.getClass(); | ||
| boolean success = false; | ||
| try { | ||
| final Netty4HttpResponse response = (Netty4HttpResponse) msg; | ||
| if (response.getSequence() != writeSequence) { | ||
| assert response.getSequence() > writeSequence | ||
| : "response sequence [" + response.getSequence() + "] we below write sequence [" + writeSequence + "]"; | ||
| final Netty4RestResponse restResponse = (Netty4RestResponse) msg; | ||
| if (restResponse.getSequence() != writeSequence) { | ||
| assert restResponse.getSequence() > writeSequence | ||
| : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]"; | ||
| if (outboundHoldingQueue.size() >= maxEventsHeld) { | ||
| int eventCount = outboundHoldingQueue.size() + 1; | ||
| throw new IllegalStateException( | ||
| "Too many pipelined events [" + eventCount + "]. Max events allowed [" + maxEventsHeld + "]." | ||
| ); | ||
| } | ||
| // response is not at the current sequence number so we add it to the outbound queue and return | ||
| outboundHoldingQueue.add(new Tuple<>(response, promise)); | ||
| outboundHoldingQueue.add(new Tuple<>(restResponse, promise)); | ||
| success = true; | ||
| return; | ||
| } | ||
|
|
||
| // response is at the current sequence number and does not need to wait for any other response to be written so we write | ||
| // it out directly | ||
| doWrite(ctx, response, promise); | ||
| doWrite(ctx, restResponse, promise); | ||
| success = true; | ||
| // see if we have any queued up responses that became writeable due to the above write | ||
| while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) { | ||
| final Tuple<Netty4HttpResponse, ChannelPromise> top = outboundHoldingQueue.poll(); | ||
| assert top != null : "we know the outbound holding queue to not be empty at this point"; | ||
| doWrite(ctx, top.v1(), top.v2()); | ||
| } | ||
| doWriteQueued(ctx); | ||
| } catch (IllegalStateException e) { | ||
| ctx.channel().close(); | ||
| } finally { | ||
|
|
@@ -158,6 +170,14 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann | |
| } | ||
| } | ||
|
|
||
| private void doWriteQueued(ChannelHandlerContext ctx) throws IOException { | ||
| while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) { | ||
| final Tuple<? extends Netty4RestResponse, ChannelPromise> top = outboundHoldingQueue.poll(); | ||
| assert top != null : "we know the outbound holding queue to not be empty at this point"; | ||
| doWrite(ctx, top.v1(), top.v2()); | ||
| } | ||
| } | ||
|
|
||
| private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses"; | ||
|
|
||
| private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES; | ||
|
|
@@ -169,6 +189,15 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann | |
| SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99); | ||
| } | ||
|
|
||
| private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException { | ||
| assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]"; | ||
| if (readyResponse instanceof Netty4HttpResponse) { | ||
| doWrite(ctx, (Netty4HttpResponse) readyResponse, promise); | ||
| } else { | ||
| doWrite(ctx, (Netty4ChunkedHttpResponse) readyResponse, promise); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline. | ||
| */ | ||
|
|
@@ -181,6 +210,31 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse | |
| writeSequence++; | ||
| } | ||
|
|
||
| private void doWrite(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) throws IOException { | ||
| final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); | ||
| final ChannelPromise first = ctx.newPromise(); | ||
| combiner.add((Future<Void>) first); | ||
| currentChunkedWrite = new ChunkedWrite(combiner, promise, readyResponse); | ||
| if (enqueueWrite(ctx, readyResponse, first)) { | ||
| // we were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable | ||
| while (ctx.channel().isWritable()) { | ||
| if (writeChunk(ctx, combiner, readyResponse.body())) { | ||
| finishChunkedWrite(); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void finishChunkedWrite() { | ||
| try { | ||
| currentChunkedWrite.combiner.finish(currentChunkedWrite.onDone); | ||
| } finally { | ||
| currentChunkedWrite = null; | ||
| writeSequence++; | ||
| } | ||
| } | ||
|
|
||
| private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, ChannelPromise promise) { | ||
| final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); | ||
| HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers()); | ||
|
|
@@ -193,15 +247,15 @@ private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, Ch | |
| combiner.finish(promise); | ||
| } | ||
|
|
||
| public void channelWritabilityChanged(ChannelHandlerContext ctx) { | ||
| public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException { | ||
| if (ctx.channel().isWritable()) { | ||
| doFlush(ctx); | ||
| } | ||
| ctx.fireChannelWritabilityChanged(); | ||
| } | ||
|
|
||
| @Override | ||
| public void flush(ChannelHandlerContext ctx) { | ||
| public void flush(ChannelHandlerContext ctx) throws IOException { | ||
| if (doFlush(ctx) == false) { | ||
| ctx.flush(); | ||
| } | ||
|
|
@@ -218,16 +272,30 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { | |
| * | ||
| * @return true if a call to this method resulted in a call to {@link ChannelHandlerContext#flush()} on the given {@code ctx} | ||
| */ | ||
| private boolean doFlush(ChannelHandlerContext ctx) { | ||
| private boolean doFlush(ChannelHandlerContext ctx) throws IOException { | ||
| assert ctx.executor().inEventLoop(); | ||
| final Channel channel = ctx.channel(); | ||
| if (channel.isActive() == false) { | ||
| failQueuedWrites(); | ||
| return false; | ||
| } | ||
| while (channel.isWritable()) { | ||
| final WriteOperation currentWrite = queuedWrites.poll(); | ||
| WriteOperation currentWrite = queuedWrites.poll(); | ||
| if (currentWrite == null) { | ||
| doWriteQueued(ctx); | ||
| if (channel.isWritable() == false) { | ||
| break; | ||
| } | ||
| currentWrite = queuedWrites.poll(); | ||
| } | ||
| if (currentWrite == null) { | ||
| // no bytes were found queued, check if a chunked message might have become writable | ||
| if (currentChunkedWrite != null) { | ||
| if (writeChunk(ctx, currentChunkedWrite.combiner, currentChunkedWrite.response.body())) { | ||
| finishChunkedWrite(); | ||
| } | ||
| continue; | ||
| } | ||
| break; | ||
| } | ||
| ctx.write(currentWrite.msg, currentWrite.promise); | ||
|
|
@@ -239,6 +307,21 @@ private boolean doFlush(ChannelHandlerContext ctx) { | |
| return true; | ||
| } | ||
|
|
||
| private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner, ChunkedRestResponseBody body) throws IOException { | ||
| assert body.isDone() == false : "should not continue to try and serialize once done"; | ||
| final ReleasableBytesReference bytes = body.encodeChunk( | ||
| Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, | ||
| serverTransport.recycler() | ||
| ); | ||
| assert bytes.length() > 0 : "serialization should not produce empty buffers"; | ||
| final ByteBuf content = Netty4Utils.toByteBuf(bytes); | ||
| final boolean done = body.isDone(); | ||
| final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we assert that: to ensure progress of chunking in the wide variety of implementations we will eventually see?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can be more extreme IMO and just assert
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure we can always have that guarantee for the last chunk, but as long as we emit json it sounds plausible so ok by me for now. |
||
| f.addListener(ignored -> bytes.close()); | ||
| combiner.add(f); | ||
| return done; | ||
| } | ||
|
|
||
| private void failQueuedWrites() { | ||
| WriteOperation queuedWrite; | ||
| while ((queuedWrite = queuedWrites.poll()) != null) { | ||
|
|
@@ -248,32 +331,43 @@ private void failQueuedWrites() { | |
|
|
||
| @Override | ||
| public void close(ChannelHandlerContext ctx, ChannelPromise promise) { | ||
| List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses(); | ||
| if (currentChunkedWrite != null) { | ||
| safeFailPromise(currentChunkedWrite.onDone, new ClosedChannelException()); | ||
| currentChunkedWrite = null; | ||
| } | ||
| List<Tuple<? extends Netty4RestResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses(); | ||
|
|
||
| if (inflightResponses.isEmpty() == false) { | ||
| ClosedChannelException closedChannelException = new ClosedChannelException(); | ||
| for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) { | ||
| try { | ||
| inflightResponse.v2().setFailure(closedChannelException); | ||
| } catch (RuntimeException e) { | ||
| logger.error("unexpected error while releasing pipelined http responses", e); | ||
| } | ||
| for (Tuple<? extends Netty4RestResponse, ChannelPromise> inflightResponse : inflightResponses) { | ||
| safeFailPromise(inflightResponse.v2(), closedChannelException); | ||
| } | ||
| } | ||
| ctx.close(promise); | ||
| } | ||
|
|
||
| private void safeFailPromise(ChannelPromise promise, Exception ex) { | ||
| try { | ||
| promise.setFailure(ex); | ||
| } catch (RuntimeException e) { | ||
| logger.error("unexpected error while releasing pipelined http responses", e); | ||
| } | ||
| } | ||
|
|
||
| private Future<Void> enqueueWrite(ChannelHandlerContext ctx, HttpObject msg) { | ||
| final ChannelPromise p = ctx.newPromise(); | ||
| enqueueWrite(ctx, msg, p); | ||
| return p; | ||
| } | ||
|
|
||
| private void enqueueWrite(ChannelHandlerContext ctx, HttpObject msg, ChannelPromise promise) { | ||
| // returns true if the write was actually executed and false if it was just queued up | ||
| private boolean enqueueWrite(ChannelHandlerContext ctx, HttpObject msg, ChannelPromise promise) { | ||
| if (ctx.channel().isWritable() && queuedWrites.isEmpty()) { | ||
| ctx.write(msg, promise); | ||
| return true; | ||
| } else { | ||
original-brownbear marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| queuedWrites.add(new WriteOperation(msg, promise)); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -290,8 +384,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | |
| } | ||
| } | ||
|
|
||
| private List<Tuple<Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() { | ||
| ArrayList<Tuple<Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue); | ||
| private List<Tuple<? extends Netty4RestResponse, ChannelPromise>> removeAllInflightResponses() { | ||
| ArrayList<Tuple<? extends Netty4RestResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue); | ||
| outboundHoldingQueue.clear(); | ||
| return responses; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need a flush here too if the channel is not writeable?
I wonder if we can simplify the logic here, perhaps splitting into 3 methods with the 3 concerns (outer queue, queued writes and chunked writes)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh I knew you'd notice :) I think the whole business if flushing in the loop is nonsense for normal messages right now and just a waste of cycles. I didn't add it to the chunked write loop on purpose and was going to remove it for normal writes in a follow-up.
Calling flush here will never make the channel writable again, all it will do is enqueue a redundant flush task on the executor. If we aren't writable any more we should exit the loop, pass a single flush along and wait for the writability-changed callback to be called again. I think with that in mind, it's simple enough to keep the code as currently is? (at least IMO here it's slightly easier to read having everything in one method body)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the flush first for the normal writes, in a separate PR? That would simplify this PR a bit and also ensure that not flushing is sound.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure :) #89647