From 8c12be37b73fc48c69f4da56de6cd768146690b5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 18 Jul 2019 10:56:19 +0200 Subject: [PATCH 01/19] Stop Copying Every Http Request in Message Handler * Copying the request is not necessary here. We can simply release it once the response has been generated and a lot of `Unpooled` allocations that way * Relates #32228 * I think the issue that preventet that PR that PR from being merged was solved by #39634 that moved the bulk index marker search to ByteBuf bulk access so the composite buffer shouldn't require many additional bounds checks (I'd argue the bounds checks we add, we save when copying the composite buffer) * I couldn't neccessarily reproduce much of a speedup from this change, but I could reproduce a very measureable reduction in GC time with e.g. Rally's PMC (4g heap node and bulk requests of size 5k saw a reduction in young GC time by ~10% for me) --- .../http/netty4/Netty4HttpRequest.java | 6 ++++- .../http/netty4/Netty4HttpRequestHandler.java | 22 +++++-------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index ffabe5cbbe224..0a3fa6b5cd451 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -152,7 +152,11 @@ public HttpRequest removeHeader(String header) { @Override public Netty4HttpResponse createResponse(RestStatus status, BytesReference content) { - return new Netty4HttpResponse(this, status, content); + try { + return new Netty4HttpResponse(this, status, content); + } finally { + request.release(); + } } public FullHttpRequest nettyRequest() { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index cad95d2627083..3b6cab058e285 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -19,11 +19,9 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; @@ -41,19 +39,9 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) { Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); - + boolean success = false; try { - final FullHttpRequest copiedRequest = - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); - - Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence()); - + Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence()); if (request.decoderResult().isFailure()) { Throwable cause = request.decoderResult().cause(); if (cause instanceof Error) { @@ -65,9 +53,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest Date: Thu, 18 Jul 2019 16:15:37 +0200 Subject: [PATCH 02/19] fix --- .../http/netty4/Netty4HttpRequest.java | 14 +++++++++----- .../http/netty4/Netty4HttpRequestHandler.java | 4 ++-- .../org/elasticsearch/http/DefaultRestChannel.java | 3 ++- .../java/org/elasticsearch/http/HttpRequest.java | 6 ++++++ 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 0a3fa6b5cd451..deffc2aa52a71 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class Netty4HttpRequest implements HttpRequest { @@ -48,6 +49,7 @@ public class Netty4HttpRequest implements HttpRequest { private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; + private final AtomicBoolean released = new AtomicBoolean(false); Netty4HttpRequest(FullHttpRequest request, int sequence) { this.request = request; @@ -108,6 +110,12 @@ public BytesReference content() { return content; } + @Override + public void release() { + if (released.compareAndSet(false, true)) { + request.release(); + } + } @Override public final Map> getHeaders() { @@ -152,11 +160,7 @@ public HttpRequest removeHeader(String header) { @Override public Netty4HttpResponse createResponse(RestStatus status, BytesReference content) { - try { - return new Netty4HttpResponse(this, status, content); - } finally { - request.release(); - } + return new Netty4HttpResponse(this, status, content); } public FullHttpRequest nettyRequest() { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 3b6cab058e285..7e7f45ef92e2e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -40,8 +40,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest Date: Thu, 18 Jul 2019 21:36:29 +0200 Subject: [PATCH 03/19] maybe fix --- .../org/elasticsearch/http/netty4/Netty4HttpRequest.java | 9 +++++++-- .../java/org/elasticsearch/http/DefaultRestChannel.java | 5 ++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index deffc2aa52a71..59d5053562e13 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -49,9 +49,13 @@ public class Netty4HttpRequest implements HttpRequest { private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; - private final AtomicBoolean released = new AtomicBoolean(false); + private final AtomicBoolean released; Netty4HttpRequest(FullHttpRequest request, int sequence) { + this(request, sequence, new AtomicBoolean(false)); + } + + private Netty4HttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { this.request = request; headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; @@ -60,6 +64,7 @@ public class Netty4HttpRequest implements HttpRequest { } else { this.content = BytesArray.EMPTY; } + this.released = released; } @Override @@ -155,7 +160,7 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new Netty4HttpRequest(requestWithoutHeader, sequence); + return new Netty4HttpRequest(requestWithoutHeader, sequence, released); } @Override diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 50f6f6c3f7978..082ecfe458426 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() { @Override public void sendResponse(RestResponse restResponse) { - final ArrayList toClose = new ArrayList<>(3); + final ArrayList toClose = new ArrayList<>(4); + toClose.add(httpRequest::release); if (isCloseConnection()) { toClose.add(() -> CloseableChannel.closeChannel(httpChannel)); } @@ -100,7 +101,6 @@ public void sendResponse(RestResponse restResponse) { } final HttpResponse httpResponse = httpRequest.createResponse(restResponse.status(), finalContent); - httpRequest.release(); // TODO: Ideally we should move the setting of Cors headers into :server // NioCorsHandler.setCorsResponseHeaders(nettyRequest, resp, corsConfig); @@ -130,7 +130,6 @@ public void sendResponse(RestResponse restResponse) { success = true; } finally { if (success == false) { - httpRequest.release(); Releasables.close(toClose); } } From aa0adb5e63ae15c8d6947b5d064bc67dbf5d649a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 08:01:31 +0200 Subject: [PATCH 04/19] stop retaining reference to actual request in put pipeline rest action --- .../rest/action/ingest/RestPutPipelineAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 9cd66c8c9e456..a710c7c67b6a6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -47,7 +48,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { Tuple sourceTuple = restRequest.contentOrSourceParam(); - PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1()); + PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), + new BytesArray(BytesReference.toBytes(sourceTuple.v2())), sourceTuple.v1()); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout())); return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel)); From fe122047c06972d3a6936902a07501196a317863 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 11:11:54 +0200 Subject: [PATCH 05/19] add assertion --- .../java/org/elasticsearch/http/netty4/Netty4HttpRequest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 59d5053562e13..4a61e264502a7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -112,6 +112,7 @@ public String uri() { @Override public BytesReference content() { + assert released.get() == false; return content; } From 49fc02737e32cb0ecfc959c680f322b4eac03d2b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 13:06:13 +0200 Subject: [PATCH 06/19] add back empty line --- .../src/main/java/org/elasticsearch/http/DefaultRestChannel.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 082ecfe458426..0317fb03b0f7f 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -101,6 +101,7 @@ public void sendResponse(RestResponse restResponse) { } final HttpResponse httpResponse = httpRequest.createResponse(restResponse.status(), finalContent); + // TODO: Ideally we should move the setting of Cors headers into :server // NioCorsHandler.setCorsResponseHeaders(nettyRequest, resp, corsConfig); From 2f1fa3fa7a90a47d388b74c24b74887e932bba7d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 13:19:15 +0200 Subject: [PATCH 07/19] Fix NIO as well --- .../org/elasticsearch/http/nio/NioHttpRequest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 08937593f3ba6..fc0e9c8db5f3d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class NioHttpRequest implements HttpRequest { @@ -48,8 +49,13 @@ public class NioHttpRequest implements HttpRequest { private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; + private final AtomicBoolean released; NioHttpRequest(FullHttpRequest request, int sequence) { + this(request, sequence, new AtomicBoolean(false)); + } + + private NioHttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { this.request = request; headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; @@ -58,6 +64,7 @@ public class NioHttpRequest implements HttpRequest { } else { this.content = BytesArray.EMPTY; } + this.released = released; } @Override @@ -105,9 +112,16 @@ public String uri() { @Override public BytesReference content() { + assert released.get() == false; return content; } + @Override + public void release() { + if (released.compareAndSet(false, true)) { + request.release(); + } + } @Override public final Map> getHeaders() { From 957c8a17cfe20a0fcf5f55c5571e4c617283b05e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 13:35:22 +0200 Subject: [PATCH 08/19] fix nio as well --- .../http/nio/HttpReadWriteHandler.java | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 795cb42b805d5..257bc49d5837a 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -19,10 +19,8 @@ package org.elasticsearch.http.nio; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -159,17 +157,9 @@ private void handleRequest(Object msg) { final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; FullHttpRequest request = pipelinedRequest.getRequest(); + boolean success = false; try { - final FullHttpRequest copiedRequest = - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); - - NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence()); + NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence()); if (request.decoderResult().isFailure()) { Throwable cause = request.decoderResult().cause(); @@ -182,9 +172,11 @@ private void handleRequest(Object msg) { } else { transport.incomingRequest(httpRequest, nioHttpChannel); } + success = true; } finally { - // As we have copied the buffer, we can release the request - request.release(); + if (success == false) { + request.release(); + } } } From a36b914b09315ab77a3f0276171234f16df6dbff Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 19 Jul 2019 14:43:40 +0200 Subject: [PATCH 09/19] add comment on copying pipeline source --- .../elasticsearch/rest/action/ingest/RestPutPipelineAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index a710c7c67b6a6..d5a44bd523b03 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -48,6 +48,8 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { Tuple sourceTuple = restRequest.contentOrSourceParam(); + // Copying the body of the REST request here since they're used by the ingest service after this request has been responded to + // and depending on the underlying request buffer implementation the bytes might become unavailable at that point PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), new BytesArray(BytesReference.toBytes(sourceTuple.v2())), sourceTuple.v1()); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); From 2f4b9215efd08ed12dd7cd10b1eaee2e0d7058be Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 10 Aug 2019 14:16:18 +0200 Subject: [PATCH 10/19] only fast-path search+bulk req. --- .../http/netty4/Netty4HttpRequest.java | 18 ++++++++++++++++ .../http/nio/NioHttpRequest.java | 18 ++++++++++++++++ .../org/elasticsearch/http/HttpRequest.java | 12 +++++++++-- .../elasticsearch/rest/RestController.java | 11 +++++++--- .../org/elasticsearch/rest/RestHandler.java | 4 ++++ .../org/elasticsearch/rest/RestRequest.java | 21 +++++++++++++++++++ .../rest/action/document/RestBulkAction.java | 7 ++++++- .../action/ingest/RestPutPipelineAction.java | 6 +----- .../rest/action/search/RestSearchAction.java | 5 +++++ .../http/DefaultRestChannelTests.java | 14 +++++++++++++ .../rest/RestControllerTests.java | 14 +++++++++++++ .../test/rest/FakeRestRequest.java | 14 +++++++++++++ .../security/rest/SecurityRestFilter.java | 5 +++++ 13 files changed, 138 insertions(+), 11 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 4a61e264502a7..895bbf1ebb876 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -123,6 +125,22 @@ public void release() { } } + @Override + public HttpRequest releaseAndCopy() { + try { + return new Netty4HttpRequest( + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), + Unpooled.copiedBuffer(request.content()), request.headers(), request.trailingHeaders()), sequence); + } finally { + release(); + } + } + + @Override + public boolean isPooled() { + return request.content() instanceof UnpooledHeapByteBuf == false; + } + @Override public final Map> getHeaders() { return headers; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index fc0e9c8db5f3d..a4e6646bc95b0 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.http.nio; +import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -123,6 +125,22 @@ public void release() { } } + @Override + public HttpRequest releaseAndCopy() { + try { + return new NioHttpRequest( + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), + Unpooled.copiedBuffer(request.content()), request.headers(), request.trailingHeaders()), sequence); + } finally { + release(); + } + } + + @Override + public boolean isPooled() { + return request.content() instanceof UnpooledHeapByteBuf == false; + } + @Override public final Map> getHeaders() { return headers; diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index c8d3a0c55c584..86063dac41468 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -68,10 +68,18 @@ enum HttpVersion { */ HttpResponse createResponse(RestStatus status, BytesReference content); + boolean isPooled(); + /** * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} * after this method has been invoked is undefined and implementation specific. */ - default void release() { - } + void release(); + + /** + * If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases + * any resources associated with this instance. If the instance does not use any shared resources, returns itself. + * @return a safe unpooled http request + */ + HttpRequest releaseAndCopy(); } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 2bab3f6e7662d..c4ef557b76fb3 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -215,13 +215,18 @@ private boolean dispatchRequest(RestRequest request, RestChannel channel, RestHa } RestChannel responseChannel = channel; try { + final boolean copiesRequest = handler.allowsUnsafeRequest() == false; + final int contentMemoryUsed = copiesRequest ? 2 * contentLength : contentLength; if (handler.canTripCircuitBreaker()) { - inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentMemoryUsed, ""); } else { - inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentMemoryUsed); } // iff we could reserve bytes for the request we need to send the response also over this channel - responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentMemoryUsed); + if (copiesRequest) { + request = RestRequest.maybeSafeCopy(request); + } handler.handleRequest(request, responseChannel, client); } catch (Exception e) { responseChannel.sendResponse(new BytesRestResponse(responseChannel, e)); diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java index 1ebc7a7fd1bd2..e3cf7091fc87a 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java @@ -47,4 +47,8 @@ default boolean canTripCircuitBreaker() { default boolean supportsContentStream() { return false; } + + default boolean allowsUnsafeRequest() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 23e72d0c1f1d5..c0fafd203847e 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -97,6 +97,27 @@ protected RestRequest(RestRequest restRequest) { restRequest.getHttpRequest(), restRequest.getHttpChannel()); } + public static RestRequest maybeSafeCopy(RestRequest request) { + if (request.httpRequest.isPooled() == false) { + return request; + } + final HttpRequest copiedHttpRequest = request.httpRequest.releaseAndCopy(); + assert copiedHttpRequest != request.httpRequest : "Should not copy rest request if http request does not need safe copying"; + final Map remainingParams; + if (request.params.size() == request.consumedParams.size()) { + remainingParams = Collections.emptyMap(); + } else { + remainingParams = new HashMap<>(); + for (Map.Entry paramEntry : request.params.entrySet()) { + if (request.consumedParams.contains(paramEntry.getKey()) == false) { + remainingParams.put(paramEntry.getKey(), paramEntry.getValue()); + } + } + } + return new RestRequest( + request.xContentRegistry, remainingParams, request.rawPath(), request.headers, copiedHttpRequest, request.httpChannel); + } + /** * Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be * decoded diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 08ddbb728c1ab..fcbec3b64c36b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -83,7 +83,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (defaultType == null) { defaultType = MapperService.SINGLE_MAPPING_NAME; } else { - deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); + deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); } String defaultRouting = request.param("routing"); FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); @@ -104,4 +104,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC public boolean supportsContentStream() { return true; } + + @Override + public boolean allowsUnsafeRequest() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index d5a44bd523b03..9cd66c8c9e456 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -48,10 +47,7 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { Tuple sourceTuple = restRequest.contentOrSourceParam(); - // Copying the body of the REST request here since they're used by the ingest service after this request has been responded to - // and depending on the underlying request buffer implementation the bytes might become unavailable at that point - PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), - new BytesArray(BytesReference.toBytes(sourceTuple.v2())), sourceTuple.v1()); + PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1()); request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout())); return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel)); diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 95695bec4f0c1..6c41c01710714 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -307,4 +307,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea protected Set responseParams() { return RESPONSE_PARAMS; } + + @Override + public boolean allowsUnsafeRequest() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index c58ec6a4becbb..988264cddbedf 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -450,6 +450,20 @@ public HttpRequest removeHeader(String header) { public HttpResponse createResponse(RestStatus status, BytesReference content) { return new TestResponse(status, content); } + + @Override + public boolean isPooled() { + return false; + } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class TestResponse implements HttpResponse { diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 4a7637cfb9105..00909d52ad550 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -554,6 +554,20 @@ public HttpRequest removeHeader(String header) { public HttpResponse createResponse(RestStatus status, BytesReference content) { return null; } + + @Override + public boolean isPooled() { + return false; + } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } }, null); final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index a659d6af5c6aa..69a4827fc7cd2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -113,6 +113,20 @@ public boolean containsHeader(String name) { } }; } + + @Override + public boolean isPooled() { + return false; + } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class FakeHttpChannel implements HttpChannel { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java index 6a5f49545933f..16cc445e96110 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java @@ -80,6 +80,11 @@ public boolean supportsContentStream() { return restHandler.supportsContentStream(); } + @Override + public boolean allowsUnsafeRequest() { + return restHandler.allowsUnsafeRequest(); + } + private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException { if (restHandler instanceof RestRequestFilter) { return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest); From 80368bbf1c468d70dbc5124806cc27ddba4a3a9d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 10 Aug 2019 17:52:37 +0200 Subject: [PATCH 11/19] quick and dirty --- .../org/elasticsearch/rest/RestRequest.java | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index c0fafd203847e..82792ea1f268e 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params { private final String rawPath; private final Set consumedParams = new HashSet<>(); private final SetOnce xContentType = new SetOnce<>(); - private final HttpRequest httpRequest; private final HttpChannel httpChannel; + private HttpRequest httpRequest; + private boolean contentConsumed = false; public boolean isContentConsumed() { @@ -98,24 +99,8 @@ protected RestRequest(RestRequest restRequest) { } public static RestRequest maybeSafeCopy(RestRequest request) { - if (request.httpRequest.isPooled() == false) { - return request; - } - final HttpRequest copiedHttpRequest = request.httpRequest.releaseAndCopy(); - assert copiedHttpRequest != request.httpRequest : "Should not copy rest request if http request does not need safe copying"; - final Map remainingParams; - if (request.params.size() == request.consumedParams.size()) { - remainingParams = Collections.emptyMap(); - } else { - remainingParams = new HashMap<>(); - for (Map.Entry paramEntry : request.params.entrySet()) { - if (request.consumedParams.contains(paramEntry.getKey()) == false) { - remainingParams.put(paramEntry.getKey(), paramEntry.getValue()); - } - } - } - return new RestRequest( - request.xContentRegistry, remainingParams, request.rawPath(), request.headers, copiedHttpRequest, request.httpChannel); + request.httpRequest = request.httpRequest.releaseAndCopy(); + return request; } /** From 32cfa402f8a024ac9f302c3093200908b6079c88 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 10 Aug 2019 20:10:31 +0200 Subject: [PATCH 12/19] too ambitious --- .../java/org/elasticsearch/rest/RestController.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index c4ef557b76fb3..9fbe1c94a6c04 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -215,16 +215,15 @@ private boolean dispatchRequest(RestRequest request, RestChannel channel, RestHa } RestChannel responseChannel = channel; try { - final boolean copiesRequest = handler.allowsUnsafeRequest() == false; - final int contentMemoryUsed = copiesRequest ? 2 * contentLength : contentLength; if (handler.canTripCircuitBreaker()) { - inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentMemoryUsed, ""); + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); } else { - inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentMemoryUsed); + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); } // iff we could reserve bytes for the request we need to send the response also over this channel - responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentMemoryUsed); - if (copiesRequest) { + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + if (handler.allowsUnsafeRequest() == false) { + // TODO: Count requests double in the circuit breaker if they need copying? request = RestRequest.maybeSafeCopy(request); } handler.handleRequest(request, responseChannel, client); From 47ec7058bc460b08123b06494cb37ed546d792f8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 11 Aug 2019 16:34:17 +0200 Subject: [PATCH 13/19] optimize some more --- .../org/elasticsearch/http/netty4/Netty4HttpRequest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 895bbf1ebb876..8ac83400cc42d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -47,11 +47,11 @@ import java.util.stream.Collectors; public class Netty4HttpRequest implements HttpRequest { - private final FullHttpRequest request; - private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; private final AtomicBoolean released; + private final FullHttpRequest request; + private BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { this(request, sequence, new AtomicBoolean(false)); @@ -122,11 +122,15 @@ public BytesReference content() { public void release() { if (released.compareAndSet(false, true)) { request.release(); + content = null; } } @Override public HttpRequest releaseAndCopy() { + if (isPooled() == false) { + return this; + } try { return new Netty4HttpRequest( new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), From bd1ab0f3bcbf5255d700552c9695091ca54ef3b8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 12 Aug 2019 07:03:55 +0200 Subject: [PATCH 14/19] faster --- .../http/netty4/Netty4HttpRequest.java | 32 +++++++++-------- .../http/nio/NioHttpRequest.java | 36 +++++++++++-------- .../org/elasticsearch/http/HttpRequest.java | 2 -- .../elasticsearch/rest/RestController.java | 6 ++-- .../org/elasticsearch/rest/RestHandler.java | 10 +++++- .../org/elasticsearch/rest/RestRequest.java | 10 ++++-- .../rest/action/document/RestBulkAction.java | 2 +- .../rest/action/search/RestSearchAction.java | 2 +- .../http/DefaultRestChannelTests.java | 5 --- .../rest/RestControllerTests.java | 5 --- .../test/rest/FakeRestRequest.java | 5 --- .../security/rest/SecurityRestFilter.java | 4 +-- 12 files changed, 61 insertions(+), 58 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 8ac83400cc42d..02470ccc8da8e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -30,7 +31,6 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -51,6 +51,7 @@ public class Netty4HttpRequest implements HttpRequest { private final int sequence; private final AtomicBoolean released; private final FullHttpRequest request; + private final boolean pooled; private BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { @@ -58,14 +59,17 @@ public class Netty4HttpRequest implements HttpRequest { } private Netty4HttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { + this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf, + Netty4Utils.toBytesReference(request.content())); + } + + private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + BytesReference content) { this.request = request; - headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - if (request.content().isReadable()) { - this.content = Netty4Utils.toBytesReference(request.content()); - } else { - this.content = BytesArray.EMPTY; - } + this.headers = headers; + this.content = content; + this.pooled = pooled; this.released = released; } @@ -128,23 +132,21 @@ public void release() { @Override public HttpRequest releaseAndCopy() { - if (isPooled() == false) { + assert content != null; + if (pooled == false) { return this; } try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); return new Netty4HttpRequest( - new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), - Unpooled.copiedBuffer(request.content()), request.headers(), request.trailingHeaders()), sequence); + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), + request.trailingHeaders()), + headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent)); } finally { release(); } } - @Override - public boolean isPooled() { - return request.content() instanceof UnpooledHeapByteBuf == false; - } - @Override public final Map> getHeaders() { return headers; diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index a4e6646bc95b0..10fbaa1ba511f 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.nio; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -30,7 +31,6 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -48,24 +48,28 @@ public class NioHttpRequest implements HttpRequest { private final FullHttpRequest request; - private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; private final AtomicBoolean released; + private final boolean pooled; + private BytesReference content; NioHttpRequest(FullHttpRequest request, int sequence) { this(request, sequence, new AtomicBoolean(false)); } private NioHttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { + this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf, + ByteBufUtils.toBytesReference(request.content())); + } + + private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + BytesReference content) { this.request = request; - headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - if (request.content().isReadable()) { - this.content = ByteBufUtils.toBytesReference(request.content()); - } else { - this.content = BytesArray.EMPTY; - } + this.headers = headers; + this.content = content; + this.pooled = pooled; this.released = released; } @@ -122,25 +126,27 @@ public BytesReference content() { public void release() { if (released.compareAndSet(false, true)) { request.release(); + content = null; } } @Override public HttpRequest releaseAndCopy() { + assert content != null; + if (pooled == false) { + return this; + } try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); return new NioHttpRequest( - new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), - Unpooled.copiedBuffer(request.content()), request.headers(), request.trailingHeaders()), sequence); + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), + request.trailingHeaders()), + headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent)); } finally { release(); } } - @Override - public boolean isPooled() { - return request.content() instanceof UnpooledHeapByteBuf == false; - } - @Override public final Map> getHeaders() { return headers; diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 86063dac41468..4d67078fe571a 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -68,8 +68,6 @@ enum HttpVersion { */ HttpResponse createResponse(RestStatus status, BytesReference content); - boolean isPooled(); - /** * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} * after this method has been invoked is undefined and implementation specific. diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 9fbe1c94a6c04..b6331fdfe294b 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -222,9 +222,9 @@ private boolean dispatchRequest(RestRequest request, RestChannel channel, RestHa } // iff we could reserve bytes for the request we need to send the response also over this channel responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); - if (handler.allowsUnsafeRequest() == false) { - // TODO: Count requests double in the circuit breaker if they need copying? - request = RestRequest.maybeSafeCopy(request); + // TODO: Count requests double in the circuit breaker if they need copying? + if (handler.allowsUnsafeBuffers() == false) { + request.ensureSafeBuffers(); } handler.handleRequest(request, responseChannel, client); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java index e3cf7091fc87a..605dd41078a54 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java @@ -48,7 +48,15 @@ default boolean supportsContentStream() { return false; } - default boolean allowsUnsafeRequest() { + /** + * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return + * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the + * {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers + * {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}. + * + * @return true iff the handler supports requests that make use of pooled buffers + */ + default boolean allowsUnsafeBuffers() { return false; } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 82792ea1f268e..4a8fb44fc4299 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -98,9 +98,13 @@ protected RestRequest(RestRequest restRequest) { restRequest.getHttpRequest(), restRequest.getHttpChannel()); } - public static RestRequest maybeSafeCopy(RestRequest request) { - request.httpRequest = request.httpRequest.releaseAndCopy(); - return request; + /** + * Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request + * with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely + * handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}. + */ + void ensureSafeBuffers() { + httpRequest = httpRequest.releaseAndCopy(); } /** diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 6af3c4463eb8f..4eb2a49884f04 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -105,7 +105,7 @@ public boolean supportsContentStream() { } @Override - public boolean allowsUnsafeRequest() { + public boolean allowsUnsafeBuffers() { return true; } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 5efa18f737de2..a3e900f6c3f69 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -307,7 +307,7 @@ protected Set responseParams() { } @Override - public boolean allowsUnsafeRequest() { + public boolean allowsUnsafeBuffers() { return true; } } diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 988264cddbedf..e8fcc83d9d795 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -451,11 +451,6 @@ public HttpResponse createResponse(RestStatus status, BytesReference content) { return new TestResponse(status, content); } - @Override - public boolean isPooled() { - return false; - } - @Override public void release() { } diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 00909d52ad550..b60e89d1e5e2c 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -555,11 +555,6 @@ public HttpResponse createResponse(RestStatus status, BytesReference content) { return null; } - @Override - public boolean isPooled() { - return false; - } - @Override public void release() { } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 69a4827fc7cd2..2f2f5fb76bfe7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -114,11 +114,6 @@ public boolean containsHeader(String name) { }; } - @Override - public boolean isPooled() { - return false; - } - @Override public void release() { } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java index 16cc445e96110..1fe4ecf246529 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java @@ -81,8 +81,8 @@ public boolean supportsContentStream() { } @Override - public boolean allowsUnsafeRequest() { - return restHandler.allowsUnsafeRequest(); + public boolean allowsUnsafeBuffers() { + return restHandler.allowsUnsafeBuffers(); } private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException { From 64581f5584ab84f83ba54a14a4226c7fed2f1548 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 12 Aug 2019 08:46:06 +0200 Subject: [PATCH 15/19] stop releasing unpooled buffers --- .../org/elasticsearch/http/netty4/Netty4HttpRequest.java | 6 +++--- .../java/org/elasticsearch/http/nio/NioHttpRequest.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 02470ccc8da8e..317a0b0da22f5 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -59,7 +59,7 @@ public class Netty4HttpRequest implements HttpRequest { } private Netty4HttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { - this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf, + this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf == false, Netty4Utils.toBytesReference(request.content())); } @@ -124,10 +124,10 @@ public BytesReference content() { @Override public void release() { - if (released.compareAndSet(false, true)) { + if (pooled && released.compareAndSet(false, true)) { request.release(); - content = null; } + content = null; } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 10fbaa1ba511f..e2198cad32a6b 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -59,7 +59,7 @@ public class NioHttpRequest implements HttpRequest { } private NioHttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { - this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf, + this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf == false, ByteBufUtils.toBytesReference(request.content())); } @@ -124,10 +124,10 @@ public BytesReference content() { @Override public void release() { - if (released.compareAndSet(false, true)) { + if (pooled && released.compareAndSet(false, true)) { request.release(); - content = null; } + content = null; } @Override From e5ebd7ed0500176040bf2100e275f8efc8408c5d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 12 Aug 2019 10:33:14 +0200 Subject: [PATCH 16/19] nicer assertion --- .../org/elasticsearch/http/netty4/Netty4HttpRequest.java | 5 +++-- .../main/java/org/elasticsearch/http/nio/NioHttpRequest.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 317a0b0da22f5..445a906a0b7c7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -118,8 +118,9 @@ public String uri() { @Override public BytesReference content() { - assert released.get() == false; - return content; + final BytesReference contentRef = content; + assert contentRef != null && released.get() == false; + return contentRef; } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index e2198cad32a6b..10dbe8381f689 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -118,8 +118,9 @@ public String uri() { @Override public BytesReference content() { - assert released.get() == false; - return content; + final BytesReference contentRef = content; + assert contentRef != null && released.get() == false; + return contentRef; } @Override From 9b6080350d597f705ed86c4ab4acd5177055976a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 15 Nov 2019 15:16:19 +0100 Subject: [PATCH 17/19] CR: don't optimize NIO transport for now --- .../http/nio/HttpReadWriteHandler.java | 41 +++++++++------ .../http/nio/NioHttpRequest.java | 52 ++++--------------- 2 files changed, 35 insertions(+), 58 deletions(-) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 257bc49d5837a..736e89c32dd56 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -19,8 +19,10 @@ package org.elasticsearch.http.nio; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -157,26 +159,31 @@ private void handleRequest(Object msg) { final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; FullHttpRequest request = pipelinedRequest.getRequest(); - boolean success = false; + final FullHttpRequest copiedRequest; try { - NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence()); - - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); - } else { - transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); - } - } else { - transport.incomingRequest(httpRequest, nioHttpChannel); - } - success = true; + copiedRequest = new DefaultFullHttpRequest( + request.protocolVersion(), + request.method(), + request.uri(), + Unpooled.copiedBuffer(request.content()), + request.headers(), + request.trailingHeaders()); } finally { - if (success == false) { - request.release(); + // As we have copied the buffer, we can release the request + request.release(); + } + NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence()); + + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); + } else { + transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); } + } else { + transport.incomingRequest(httpRequest, nioHttpChannel); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 10dbe8381f689..8e17d37699cdd 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -19,9 +19,6 @@ package org.elasticsearch.http.nio; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -31,6 +28,7 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -42,35 +40,24 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class NioHttpRequest implements HttpRequest { private final FullHttpRequest request; + private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; - private final AtomicBoolean released; - private final boolean pooled; - private BytesReference content; NioHttpRequest(FullHttpRequest request, int sequence) { - this(request, sequence, new AtomicBoolean(false)); - } - - private NioHttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { - this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf == false, - ByteBufUtils.toBytesReference(request.content())); - } - - private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, - BytesReference content) { this.request = request; + headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - this.headers = headers; - this.content = content; - this.pooled = pooled; - this.released = released; + if (request.content().isReadable()) { + this.content = ByteBufUtils.toBytesReference(request.content()); + } else { + this.content = BytesArray.EMPTY; + } } @Override @@ -118,34 +105,17 @@ public String uri() { @Override public BytesReference content() { - final BytesReference contentRef = content; - assert contentRef != null && released.get() == false; - return contentRef; + return content; } @Override public void release() { - if (pooled && released.compareAndSet(false, true)) { - request.release(); - } - content = null; + // NioHttpRequest works from copied unpooled bytes no need to release anything } @Override public HttpRequest releaseAndCopy() { - assert content != null; - if (pooled == false) { - return this; - } - try { - final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); - return new NioHttpRequest( - new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), - request.trailingHeaders()), - headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent)); - } finally { - release(); - } + return this; } @Override From 1812be4bfa6a32da8e285b30f538d137f4b3d1e9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 22 Nov 2019 15:45:21 +0100 Subject: [PATCH 18/19] CR: make content handling less weird --- .../elasticsearch/http/netty4/Netty4HttpRequest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 445a906a0b7c7..94fe708a50b99 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -52,7 +52,7 @@ public class Netty4HttpRequest implements HttpRequest { private final AtomicBoolean released; private final FullHttpRequest request; private final boolean pooled; - private BytesReference content; + private final BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { this(request, sequence, new AtomicBoolean(false)); @@ -118,9 +118,8 @@ public String uri() { @Override public BytesReference content() { - final BytesReference contentRef = content; - assert contentRef != null && released.get() == false; - return contentRef; + assert released.get() == false; + return content; } @Override @@ -128,12 +127,11 @@ public void release() { if (pooled && released.compareAndSet(false, true)) { request.release(); } - content = null; } @Override public HttpRequest releaseAndCopy() { - assert content != null; + assert released.get() == false; if (pooled == false) { return this; } From 9ab9b4a22086adcdfa95c581598b38c556ff62ee Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 28 Nov 2019 07:39:46 +0100 Subject: [PATCH 19/19] remove unpooled hack --- .../elasticsearch/http/netty4/Netty4HttpRequest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 94fe708a50b99..e0ad3007c98a9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledHeapByteBuf; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -55,11 +54,7 @@ public class Netty4HttpRequest implements HttpRequest { private final BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { - this(request, sequence, new AtomicBoolean(false)); - } - - private Netty4HttpRequest(FullHttpRequest request, int sequence, AtomicBoolean released) { - this(request, new HttpHeadersMap(request.headers()), sequence, released, request.content() instanceof UnpooledHeapByteBuf == false, + this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content())); } @@ -184,7 +179,8 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new Netty4HttpRequest(requestWithoutHeader, sequence, released); + return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, + pooled, content); } @Override