Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8c12be3
Stop Copying Every Http Request in Message Handler
original-brownbear Jul 18, 2019
cc0aaf6
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 18, 2019
0e8435c
fix
original-brownbear Jul 18, 2019
52bc3a6
maybe fix
original-brownbear Jul 18, 2019
1647a74
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 19, 2019
f413199
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 19, 2019
aa0adb5
stop retaining reference to actual request in put pipeline rest action
original-brownbear Jul 19, 2019
fe12204
add assertion
original-brownbear Jul 19, 2019
558178e
Merge remote-tracking branch 'elastic' into never-copy-http-buffer
original-brownbear Jul 19, 2019
49fc027
add back empty line
original-brownbear Jul 19, 2019
2f1fa3f
Fix NIO as well
original-brownbear Jul 19, 2019
957c8a1
fix nio as well
original-brownbear Jul 19, 2019
a36b914
add comment on copying pipeline source
original-brownbear Jul 19, 2019
1002a54
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 24, 2019
c9bc6cb
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 26, 2019
635f95f
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Jul 28, 2019
b441280
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Aug 10, 2019
2f4b921
only fast-path search+bulk req.
original-brownbear Aug 10, 2019
80368bb
quick and dirty
original-brownbear Aug 10, 2019
32cfa40
too ambitious
original-brownbear Aug 10, 2019
ee27d5b
Merge branch 'never-copy-http-buffer' of github.com:original-brownbea…
original-brownbear Aug 11, 2019
11a4e3e
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Aug 11, 2019
47ec705
optimize some more
original-brownbear Aug 11, 2019
6144ebe
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Aug 12, 2019
bd1ab0f
faster
original-brownbear Aug 12, 2019
64581f5
stop releasing unpooled buffers
original-brownbear Aug 12, 2019
e5ebd7e
nicer assertion
original-brownbear Aug 12, 2019
bf95afa
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Sep 25, 2019
4f00013
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Nov 15, 2019
9b60803
CR: don't optimize NIO transport for now
original-brownbear Nov 15, 2019
b161845
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Nov 22, 2019
1812be4
CR: make content handling less weird
original-brownbear Nov 22, 2019
2e8840a
Merge remote-tracking branch 'elastic/master' into never-copy-http-bu…
original-brownbear Nov 28, 2019
9ab9b4a
remove unpooled hack
original-brownbear Nov 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -28,7 +30,6 @@
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -41,23 +42,30 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class Netty4HttpRequest implements HttpRequest {
private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final boolean pooled;
private final BytesReference content;

Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = Netty4Utils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}

@Override
Expand Down Expand Up @@ -105,9 +113,33 @@ public String uri() {

@Override
public BytesReference content() {
assert released.get() == false;
return content;
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}

@Override
public HttpRequest releaseAndCopy() {
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down Expand Up @@ -147,7 +179,8 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, sequence);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
Expand All @@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
final FullHttpRequest copiedRequest;
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
serverTransport.incomingRequest(httpRequest, channel);
}
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public BytesReference content() {
return content;
}

@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() {

@Override
public void sendResponse(RestResponse restResponse) {
final ArrayList<Releasable> toClose = new ArrayList<>(3);
final ArrayList<Releasable> toClose = new ArrayList<>(4);
toClose.add(httpRequest::release);
if (isCloseConnection()) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/http/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,16 @@ enum HttpVersion {
*/
HttpResponse createResponse(RestStatus status, BytesReference content);

/**
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
* after this method has been invoked is undefined and implementation specific.
*/
void release();

/**
* If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
* any resources associated with this instance. If the instance does not use any shared resources, returns itself.
* @return a safe unpooled http request
*/
HttpRequest releaseAndCopy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ default boolean canTripCircuitBreaker() {
default boolean supportsContentStream() {
return false;
}

/**
* Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
* {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
* {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
* {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
*
* @return true iff the handler supports requests that make use of pooled buffers
*/
default boolean allowsUnsafeBuffers() {
return false;
}
}
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params {
private final String rawPath;
private final Set<String> consumedParams = new HashSet<>();
private final SetOnce<XContentType> xContentType = new SetOnce<>();
private final HttpRequest httpRequest;
private final HttpChannel httpChannel;

private HttpRequest httpRequest;

private boolean contentConsumed = false;

public boolean isContentConsumed() {
Expand Down Expand Up @@ -97,6 +98,15 @@ protected RestRequest(RestRequest restRequest) {
restRequest.getHttpRequest(), restRequest.getHttpChannel());
}

/**
* Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
* with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
* handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
*/
void ensureSafeBuffers() {
httpRequest = httpRequest.releaseAndCopy();
}

/**
* Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be
* decoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
public boolean supportsContentStream() {
return true;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could optimize this and other spots where we return true even further by releasing the request at the earliest possible point instead of when sending out the response. I'd push that to a follow up though, as it's not entirely trivial to get this right.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return new TestResponse(status, content);
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class TestResponse implements HttpResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return null;
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}, null);

final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ public boolean containsHeader(String name) {
}
};
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class FakeHttpChannel implements HttpChannel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public boolean supportsContentStream() {
return restHandler.supportsContentStream();
}

@Override
public boolean allowsUnsafeBuffers() {
return restHandler.allowsUnsafeBuffers();
}

private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException {
if (restHandler instanceof RestRequestFilter) {
return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);
Expand Down