Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.bytes.AbstractBytesReference;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;

Expand Down Expand Up @@ -70,7 +71,8 @@ static ByteBuf toByteBuf(final BytesReference reference) {
}

static BytesReference toBytesReference(final ByteBuf buffer) {
return new ByteBufBytesReference(buffer, buffer.readableBytes());
final int readableBytes = buffer.readableBytes();
return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes);
}

private static class ByteBufBytesReference extends AbstractBytesReference {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

package org.elasticsearch.http.nio;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
Expand Down Expand Up @@ -158,32 +156,25 @@ public void close() throws IOException {
private void handleRequest(Object msg) {
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
FullHttpRequest request = pipelinedRequest.getRequest();

final FullHttpRequest copiedRequest;
boolean success = false;
NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());
try {
copiedRequest = new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
}
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
transport.incomingRequest(httpRequest, nioHttpChannel);
}
success = true;
} finally {
if (success == false) {
request.release();
}
} else {
transport.incomingRequest(httpRequest, nioHttpChannel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.http.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -28,7 +30,6 @@
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -40,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class NioHttpRequest implements HttpRequest {
Expand All @@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest {
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final boolean pooled;

NioHttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
ByteBufUtils.toBytesReference(request.content()));
}

private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = ByteBufUtils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}

@Override
Expand Down Expand Up @@ -105,17 +113,32 @@ public String uri() {

@Override
public BytesReference content() {
assert released.get() == false;
return content;
}

@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}

@Override
public HttpRequest releaseAndCopy() {
return this;
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new NioHttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
} finally {
release();
}
}

@Override
Expand Down Expand Up @@ -156,7 +179,8 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new NioHttpRequest(requestWithoutHeader, sequence);
return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}

@Override
Expand Down