Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundHandler;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.Transports;

Expand All @@ -55,8 +56,9 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
this.transport = transport;
final ThreadPool threadPool = transport.getThreadPool();
final InboundHandler inboundHandler = transport.getInboundHandler();
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
transport::inboundMessage, transport::inboundDecodeException);
transport.getInflightBreaker(), inboundHandler::getRequestHandler, transport::inboundMessage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand All @@ -30,10 +31,12 @@
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.Page;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundHandler;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;
import java.util.function.Supplier;

public class TcpReadWriteHandler extends BytesWriteHandler {

Expand All @@ -43,8 +46,10 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) {
this.channel = channel;
final ThreadPool threadPool = transport.getThreadPool();
final Supplier<CircuitBreaker> breaker = transport.getInflightBreaker();
final InboundHandler inboundHandler = transport.getInboundHandler();
this.pipeline = new InboundPipeline(transport.getVersion(), transport.getStatsTracker(), recycler, threadPool::relativeTimeInMillis,
transport::inboundMessage, transport::inboundDecodeException);
breaker, inboundHandler::getRequestHandler, transport::inboundMessage);
}

@Override
Expand Down
158 changes: 137 additions & 21 deletions server/src/main/java/org/elasticsearch/transport/InboundAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.transport;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand All @@ -27,44 +29,69 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class InboundAggregator implements Releasable {

private final Supplier<CircuitBreaker> circuitBreaker;
private final Predicate<String> requestCanTripBreaker;

private ReleasableBytesReference firstContent;
private ArrayList<ReleasableBytesReference> contentAggregation;
private Header currentHeader;
private Exception aggregationException;
private boolean canTripBreaker = true;
private boolean isClosed = false;

public InboundAggregator(Supplier<CircuitBreaker> circuitBreaker,
Function<String, RequestHandlerRegistry<TransportRequest>> registryFunction) {
this(circuitBreaker, (Predicate<String>) actionName -> {
final RequestHandlerRegistry<TransportRequest> reg = registryFunction.apply(actionName);
if (reg == null) {
throw new ActionNotFoundTransportException(actionName);
} else {
return reg.canTripCircuitBreaker();
}
});
}

// Visible for testing
InboundAggregator(Supplier<CircuitBreaker> circuitBreaker, Predicate<String> requestCanTripBreaker) {
this.circuitBreaker = circuitBreaker;
this.requestCanTripBreaker = requestCanTripBreaker;
}

public void headerReceived(Header header) {
ensureOpen();
assert isAggregating() == false;
assert firstContent == null && contentAggregation == null;
currentHeader = header;
if (currentHeader.isRequest() && currentHeader.needsToReadVariableHeader() == false) {
initializeRequestState();
}
}

public void aggregate(ReleasableBytesReference content) {
ensureOpen();
assert isAggregating();
if (isFirstContent()) {
firstContent = content.retain();
} else {
if (contentAggregation == null) {
contentAggregation = new ArrayList<>(4);
contentAggregation.add(firstContent);
firstContent = null;
if (isShortCircuited() == false) {
if (isFirstContent()) {
firstContent = content.retain();
} else {
if (contentAggregation == null) {
contentAggregation = new ArrayList<>(4);
assert firstContent != null;
contentAggregation.add(firstContent);
firstContent = null;
}
contentAggregation.add(content.retain());
}
contentAggregation.add(content.retain());
}
}

public Header cancelAggregation() {
ensureOpen();
assert isAggregating();
final Header header = this.currentHeader;
closeCurrentAggregation();
return header;
}

public InboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
Expand All @@ -77,16 +104,30 @@ public InboundMessage finishAggregation() throws IOException {
final CompositeBytesReference content = new CompositeBytesReference(references);
releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references));
}
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent);
resetCurrentAggregation();

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
aggregated.getHeader().finishParsingHeader(aggregated.openOrGetStreamInput());
if (aggregated.getHeader().isRequest()) {
initializeRequestState();
}
}
if (isShortCircuited() == false) {
checkBreaker(aggregated.getHeader(), aggregated.getContentLength(), breakerControl);
}
if (isShortCircuited()) {
aggregated.close();
success = true;
return new InboundMessage(aggregated.getHeader(), aggregationException);
} else {
success = true;
return aggregated;
}
success = true;
return aggregated;
} finally {
resetCurrentAggregation();
if (success == false) {
aggregated.close();
}
Expand All @@ -97,6 +138,14 @@ public boolean isAggregating() {
return currentHeader != null;
}

private void shortCircuit(Exception exception) {
this.aggregationException = exception;
}

private boolean isShortCircuited() {
return aggregationException != null;
}

private boolean isFirstContent() {
return firstContent == null && contentAggregation == null;
}
Expand All @@ -108,23 +157,90 @@ public void close() {
}

private void closeCurrentAggregation() {
releaseContent();
resetCurrentAggregation();
}

private void releaseContent() {
if (contentAggregation == null) {
Releasables.close(firstContent);
} else {
Releasables.close(contentAggregation);
}
resetCurrentAggregation();
}

private void resetCurrentAggregation() {
firstContent = null;
contentAggregation = null;
currentHeader = null;
aggregationException = null;
canTripBreaker = true;
}

private void ensureOpen() {
if (isClosed) {
throw new IllegalStateException("Aggregator is already closed");
}
}

private void initializeRequestState() {
assert currentHeader.needsToReadVariableHeader() == false;
assert currentHeader.isRequest();
if (currentHeader.isHandshake()) {
canTripBreaker = false;
return;
}

final String actionName = currentHeader.getActionName();
try {
canTripBreaker = requestCanTripBreaker.test(actionName);
} catch (ActionNotFoundTransportException e) {
shortCircuit(e);
}
}

private void checkBreaker(final Header header, final int contentLength, final BreakerControl breakerControl) {
if (header.isRequest() == false) {
return;
}
assert header.needsToReadVariableHeader() == false;

if (canTripBreaker) {
try {
circuitBreaker.get().addEstimateBytesAndMaybeBreak(contentLength, header.getActionName());
breakerControl.setReservedBytes(contentLength);
} catch (CircuitBreakingException e) {
shortCircuit(e);
}
} else {
circuitBreaker.get().addWithoutBreaking(contentLength);
breakerControl.setReservedBytes(contentLength);
}
}

private static class BreakerControl implements Releasable {

private static final int CLOSED = -1;

private final Supplier<CircuitBreaker> circuitBreaker;
private final AtomicInteger bytesToRelease = new AtomicInteger(0);

private BreakerControl(Supplier<CircuitBreaker> circuitBreaker) {
this.circuitBreaker = circuitBreaker;
}

private void setReservedBytes(int reservedBytes) {
final boolean set = bytesToRelease.compareAndSet(0, reservedBytes);
assert set : "Expected bytesToRelease to be 0, found " + bytesToRelease.get();
}

@Override
public void close() {
final int toRelease = bytesToRelease.getAndSet(CLOSED);
assert toRelease != CLOSED;
if (toRelease > 0) {
circuitBreaker.get().addWithoutBreaking(-toRelease);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class InboundDecoder implements Releasable {

private final Version version;
private final PageCacheRecycler recycler;
private Exception decodingException;
private TransportDecompressor decompressor;
private int totalNetworkSize = -1;
private int bytesConsumed = 0;
Expand Down Expand Up @@ -86,13 +85,6 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f
return headerBytesToRead;
}
}
} else if (isDecodingFailed()) {
int bytesToConsume = Math.min(reference.length(), totalNetworkSize - bytesConsumed);
bytesConsumed += bytesToConsume;
if (isDone()) {
finishMessage(fragmentConsumer);
}
return bytesToConsume;
} else {
// There are a minimum number of bytes required to start decompression
if (decompressor != null && decompressor.canDecompress(reference.length()) == false) {
Expand Down Expand Up @@ -130,19 +122,12 @@ public void close() {
}

private void finishMessage(Consumer<Object> fragmentConsumer) {
Object finishMarker;
if (decodingException != null) {
finishMarker = decodingException;
} else {
finishMarker = END_CONTENT;
}
cleanDecodeState();
fragmentConsumer.accept(finishMarker);
fragmentConsumer.accept(END_CONTENT);
}

private void cleanDecodeState() {
IOUtils.closeWhileHandlingException(decompressor);
decodingException = null;
decompressor = null;
totalNetworkSize = -1;
bytesConsumed = 0;
Expand Down Expand Up @@ -190,7 +175,7 @@ private Header readHeader(int networkMessageSize, BytesReference bytesReference)
Header header = new Header(networkMessageSize, requestId, status, remoteVersion);
final IllegalStateException invalidVersion = ensureVersionCompatibility(remoteVersion, version, header.isHandshake());
if (invalidVersion != null) {
decodingException = invalidVersion;
throw invalidVersion;
} else {
if (remoteVersion.onOrAfter(TcpHeader.VERSION_WITH_HEADER_SIZE)) {
// Skip since we already have ensured enough data available
Expand All @@ -206,10 +191,6 @@ private boolean isOnHeader() {
return totalNetworkSize == -1;
}

private boolean isDecodingFailed() {
return decodingException != null;
}

private void ensureOpen() {
if (isClosed) {
throw new IllegalStateException("Decoder is already closed");
Expand Down
Loading