From 52b2016447c54f498d7507a9fbcd571033ab1470 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 13 Apr 2016 09:54:59 +0200 Subject: [PATCH 1/2] Limit request size on transport level With this commit we limit the size of all in-flight requests on transport level. The size is guarded by a circuit breaker and is based on the content size of each request. By default we use 100% of available heap meaning that the parent circuit breaker will limit the maximum available size. This value can be changed by adjusting the setting network.breaker.inflight_requests.limit Relates #16011 --- .../common/breaker/CircuitBreaker.java | 25 +-- .../common/breaker/MemoryCircuitBreaker.java | 2 +- .../common/breaker/NoopCircuitBreaker.java | 3 +- .../common/settings/ClusterSettings.java | 2 + .../HierarchyCircuitBreakerService.java | 27 ++- .../elasticsearch/transport/Transport.java | 3 +- .../transport/local/LocalTransport.java | 34 +++- .../local/LocalTransportChannel.java | 31 +-- .../netty/MessageChannelHandler.java | 178 +++++++++++++----- .../transport/netty/NettyHeader.java | 19 +- .../transport/netty/NettyTransport.java | 11 +- .../netty/NettyTransportChannel.java | 25 ++- .../netty/SizeHeaderFrameDecoder.java | 13 +- .../node/tasks/TaskManagerTestCase.java | 5 +- .../BroadcastReplicationTests.java | 6 +- .../cluster/NodeConnectionsServiceTests.java | 2 +- .../common/network/NetworkModuleTests.java | 6 +- .../discovery/ZenFaultDetectionTests.java | 6 +- .../zen/ping/unicast/UnicastZenPingIT.java | 5 +- .../mapper/DynamicMappingDisabledTests.java | 4 +- .../breaker/CircuitBreakerServiceIT.java | 108 +++++++++-- .../NettySizeHeaderFrameDecoderTests.java | 2 +- .../transport/TransportModuleTests.java | 5 +- .../netty/NettyScheduledPingTests.java | 8 +- .../transport/netty/NettyTransportIT.java | 86 ++------- .../netty/NettyTransportMultiPortTests.java | 3 +- .../modules/indices/circuit_breaker.asciidoc | 21 +++ .../test/InternalTestCluster.java | 23 ++- .../transport/AssertingLocalTransport.java | 5 +- .../test/transport/MockTransportService.java | 5 +- 30 files changed, 466 insertions(+), 207 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java index 7811ff35d561f..4f9c4e458c679 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java @@ -28,11 +28,12 @@ */ public interface CircuitBreaker { - public static final String PARENT = "parent"; - public static final String FIELDDATA = "fielddata"; - public static final String REQUEST = "request"; + String PARENT = "parent"; + String FIELDDATA = "fielddata"; + String REQUEST = "request"; + String IN_FLIGHT_REQUESTS = "in_flight_requests"; - public static enum Type { + enum Type { // A regular or child MemoryCircuitBreaker MEMORY, // A special parent-type for the hierarchy breaker service @@ -59,7 +60,7 @@ public static Type parseValue(String value) { * @param fieldName name of the field responsible for tripping the breaker * @param bytesNeeded bytes asked for but unable to be allocated */ - public void circuitBreak(String fieldName, long bytesNeeded); + void circuitBreak(String fieldName, long bytesNeeded); /** * add bytes to the breaker and maybe trip @@ -67,35 +68,35 @@ public static Type parseValue(String value) { * @param label string label describing the bytes being added * @return the number of "used" bytes for the circuit breaker */ - public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException; + double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException; /** * Adjust the circuit breaker without tripping */ - public long addWithoutBreaking(long bytes); + long addWithoutBreaking(long bytes); /** * @return the currently used bytes the breaker is tracking */ - public long getUsed(); + long getUsed(); /** * @return maximum number of bytes the circuit breaker can track before tripping */ - public long getLimit(); + long getLimit(); /** * @return overhead of circuit breaker */ - public double getOverhead(); + double getOverhead(); /** * @return the number of times the circuit breaker has been tripped */ - public long getTrippedCount(); + long getTrippedCount(); /** * @return the name of the breaker */ - public String getName(); + String getName(); } diff --git a/core/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java b/core/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java index 3a5297b277b4d..7dbdd7d6a6ebc 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/MemoryCircuitBreaker.java @@ -82,7 +82,7 @@ public void circuitBreak(String fieldName, long bytesNeeded) throws CircuitBreak final String message = "Data too large, data for field [" + fieldName + "] would be larger than limit of [" + memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]"; logger.debug("{}", message); - throw new CircuitBreakingException(message); + throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit); } /** diff --git a/core/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java b/core/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java index 1e749a40d50ac..6d1f573712755 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java @@ -24,6 +24,7 @@ * basically noops */ public class NoopCircuitBreaker implements CircuitBreaker { + public static final int LIMIT = -1; private final String name; @@ -53,7 +54,7 @@ public long getUsed() { @Override public long getLimit() { - return 0; + return LIMIT; } @Override diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 56ed12711f83d..83f9afc3f3836 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -254,6 +254,8 @@ public void apply(Settings value, Settings current, Settings previous) { HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, + HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, + HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, diff --git a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java index d2d96092186e9..45304d135cb1b 100644 --- a/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java +++ b/core/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java @@ -45,7 +45,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker."; - private final ConcurrentMap breakers = new ConcurrentHashMap(); + private final ConcurrentMap breakers = new ConcurrentHashMap<>(); public static final Setting TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.byteSizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope); @@ -64,10 +64,16 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService { public static final Setting REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); - + public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING = + Setting.byteSizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope); + public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING = + Setting.doubleSetting("network.breaker.inflight_requests.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope); + public static final Setting IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING = + new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); private volatile BreakerSettings parentSettings; private volatile BreakerSettings fielddataSettings; + private volatile BreakerSettings inFlightRequestsSettings; private volatile BreakerSettings requestSettings; // Tripped count for when redistribution was attempted but wasn't successful @@ -82,6 +88,12 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings) ); + this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, + IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), + IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), + IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings) + ); + this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST, REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(), REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), @@ -95,11 +107,14 @@ public HierarchyCircuitBreakerService(Settings settings, ClusterSettings cluster registerBreaker(this.requestSettings); registerBreaker(this.fielddataSettings); + registerBreaker(this.inFlightRequestsSettings); clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit); + clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit); clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit); } + private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) { BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.bytes(), newRequestOverhead, HierarchyCircuitBreakerService.this.requestSettings.getType()); @@ -108,6 +123,14 @@ private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newReque logger.info("Updated breaker settings request: {}", newRequestSettings); } + private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) { + BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, newInFlightRequestsMax.bytes(), + newInFlightRequestsOverhead, HierarchyCircuitBreakerService.this.inFlightRequestsSettings.getType()); + registerBreaker(newInFlightRequestsSettings); + HierarchyCircuitBreakerService.this.inFlightRequestsSettings = newInFlightRequestsSettings; + logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings); + } + private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) { long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes(); newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead; diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 532c9d99ace4d..5acd750469115 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -85,7 +85,8 @@ public interface Transport extends LifecycleComponent { /** * Sends the request to the node. */ - void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; + void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws + IOException, TransportException; /** * Returns count of currently open connections diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 8585e54f50d0f..1c224e1aeef76 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -37,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.ConnectTransportException; @@ -83,13 +85,15 @@ public class LocalTransport extends AbstractLifecycleComponent implem private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); protected final NamedWriteableRegistry namedWriteableRegistry; + private final CircuitBreakerService circuitBreakerService; public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address"; public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers"; public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue"; @Inject - public LocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + public LocalTransport(Settings settings, ThreadPool threadPool, Version version, + NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super(settings); this.threadPool = threadPool; this.version = version; @@ -100,6 +104,7 @@ public LocalTransport(Settings settings, ThreadPool threadPool, Version version, this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext()); this.namedWriteableRegistry = namedWriteableRegistry; + this.circuitBreakerService = circuitBreakerService; } @Override @@ -239,6 +244,11 @@ ThreadPoolExecutor workers() { return this.workers; } + CircuitBreaker inFlightRequestsBreaker() { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } + protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { Transports.assertTransportThread(); @@ -253,13 +263,13 @@ protected void messageReceived(byte[] data, String action, LocalTransport source if (isRequest) { ThreadContext threadContext = threadPool.getThreadContext(); threadContext.readHeaders(stream); - handleRequest(stream, requestId, sourceTransport, version); + handleRequest(stream, requestId, data.length, sourceTransport, version); } else { final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); // ignore if its null, the adapter logs it if (handler != null) { if (TransportStatus.isError(status)) { - handlerResponseError(stream, handler); + handleResponseError(stream, handler); } else { handleResponse(stream, sourceTransport, handler); } @@ -267,9 +277,15 @@ protected void messageReceived(byte[] data, String action, LocalTransport source } } catch (Throwable e) { if (sendRequestId != null) { - TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(sendRequestId); + TransportResponseHandler handler = sourceTransport.transportServiceAdapter.onResponseReceived(sendRequestId); if (handler != null) { - handleException(handler, new RemoteTransportException(nodeName(), localAddress, action, e)); + RemoteTransportException error = new RemoteTransportException(nodeName(), localAddress, action, e); + sourceTransport.workers().execute(() -> { + ThreadContext threadContext = sourceTransport.threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + sourceTransport.handleException(handler, error); + } + }); } } else { logger.warn("Failed to receive message for action [{}]", e, action); @@ -277,12 +293,14 @@ protected void messageReceived(byte[] data, String action, LocalTransport source } } - private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception { + private void handleRequest(StreamInput stream, long requestId, int messageLengthBytes, LocalTransport sourceTransport, + Version version) throws Exception { stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry); final String action = stream.readString(); transportServiceAdapter.onRequestReceived(requestId, action); + inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, - requestId, version); + requestId, version, messageLengthBytes); try { final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { @@ -356,7 +374,7 @@ protected void handleParsedResponse(final TransportResponse response, final Tran }); } - private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) { + private void handleResponseError(StreamInput buffer, final TransportResponseHandler handler) { Throwable error; try { error = buffer.readThrowable(); diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index 41eb7354098e1..8ddee299d98e6 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -30,6 +30,7 @@ import org.elasticsearch.transport.support.TransportStatus; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -45,15 +46,18 @@ public class LocalTransportChannel implements TransportChannel { private final String action; private final long requestId; private final Version version; + private final long reservedBytes; + private final AtomicBoolean closed = new AtomicBoolean(); public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, - LocalTransport targetTransport, String action, long requestId, Version version) { + LocalTransport targetTransport, String action, long requestId, Version version, long reservedBytes) { this.sourceTransport = sourceTransport; this.sourceTransportServiceAdapter = sourceTransportServiceAdapter; this.targetTransport = targetTransport; this.action = action; this.requestId = requestId; this.version = version; + this.reservedBytes = reservedBytes; } @Override @@ -80,13 +84,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op status = TransportStatus.setResponse(status); stream.writeByte(status); // 0 for request, 1 for response. response.writeTo(stream); - final byte[] data = stream.bytes().toBytes(); - targetTransport.workers().execute(() -> { - ThreadContext threadContext = targetTransport.threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()){ - targetTransport.messageReceived(data, action, sourceTransport, version, null); - } - }); + sendResponseData(stream.bytes().toBytes()); sourceTransportServiceAdapter.onResponseSent(requestId, action, response, options); } } @@ -98,15 +96,26 @@ public void sendResponse(Throwable error) throws IOException { RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error); stream.writeThrowable(tx); + sendResponseData(stream.bytes().toBytes()); + sourceTransportServiceAdapter.onResponseSent(requestId, action, error); + } - final byte[] data = stream.bytes().toBytes(); + private void sendResponseData(byte[] data) { + close(); targetTransport.workers().execute(() -> { ThreadContext threadContext = targetTransport.threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()){ + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { targetTransport.messageReceived(data, action, sourceTransport, version, null); } }); - sourceTransportServiceAdapter.onResponseSent(requestId, action, error); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + sourceTransport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 9eef440114474..0cd47b093add0 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -91,17 +91,15 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex return; } ChannelBuffer buffer = (ChannelBuffer) m; - int size = buffer.getInt(buffer.readerIndex() - 4); - transportServiceAdapter.received(size + 6); + Marker marker = new Marker(buffer); + int size = marker.messageSizeWithRemainingHeaders(); + transportServiceAdapter.received(marker.messageSizeWithAllHeaders()); // we have additional bytes to read, outside of the header - boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0; - - int markedReaderIndex = buffer.readerIndex(); - int expectedIndexReader = markedReaderIndex + size; + boolean hasMessageBytesToRead = marker.messageSize() != 0; // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh - // buffer, or in the cumlation buffer, which is cleaned each time + // buffer, or in the cumulation buffer, which is cleaned each time StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size); boolean success = false; try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) { @@ -134,25 +132,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex streamIn.setVersion(version); if (TransportStatus.isRequest(status)) { threadContext.readHeaders(streamIn); - String action = handleRequest(ctx.getChannel(), streamIn, requestId, version); - - // Chek the entire message has been read - final int nextByte = streamIn.read(); - // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker - if (nextByte != -1) { - throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action - + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); - } - if (buffer.readerIndex() < expectedIndexReader) { - throw new IllegalStateException("Message is fully read (request), yet there are " - + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); - } - if (buffer.readerIndex() > expectedIndexReader) { - throw new IllegalStateException( - "Message read past expected size (request) for requestId [" + requestId + "], action [" + action - + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); - } - + handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version); } else { TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); // ignore if its null, the adapter logs it @@ -162,25 +142,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } else { handleResponse(ctx.getChannel(), streamIn, handler); } - - // Chek the entire message has been read - final int nextByte = streamIn.read(); - // calling read() is useful to make sure the message is fully read, even if there is an EOS marker - if (nextByte != -1) { - throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" - + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); - } - if (buffer.readerIndex() < expectedIndexReader) { - throw new IllegalStateException("Message is fully read (response), yet there are " - + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); - } - if (buffer.readerIndex() > expectedIndexReader) { - throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId - + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); - } - + marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status)); } } + success = true; } finally { try { if (success) { @@ -190,7 +155,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } } finally { // Set the expected position of the buffer, no matter what happened - buffer.readerIndex(expectedIndexReader); + buffer.readerIndex(marker.expectedReaderIndex()); } } } @@ -254,13 +219,17 @@ public void run() { } } - protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes, + Version version) throws IOException { buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry); final String action = buffer.readString(); transportServiceAdapter.onRequestReceived(requestId, action); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, - requestId, version, profileName); + NettyTransportChannel transportChannel = null; try { + transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, + requestId, version, profileName, messageLengthBytes); + final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException(action); @@ -268,6 +237,8 @@ protected String handleRequest(Channel channel, StreamInput buffer, long request final TransportRequest request = reg.newRequest(); request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); request.readFrom(buffer); + // in case we throw an exception, i.e. when the limit is hit, we don't want to verify + validateRequest(marker, buffer, requestId, action); if (ThreadPool.Names.SAME.equals(reg.getExecutor())) { //noinspection unchecked reg.processMessageReceived(request, transportChannel); @@ -275,6 +246,11 @@ protected String handleRequest(Channel channel, StreamInput buffer, long request threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } } catch (Throwable e) { + // the circuit breaker tripped + if (transportChannel == null) { + transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, + requestId, version, profileName, 0); + } try { transportChannel.sendResponse(e); } catch (IOException e1) { @@ -285,6 +261,12 @@ protected String handleRequest(Channel channel, StreamInput buffer, long request return action; } + // This template method is needed to inject custom error checking logic in tests. + protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException { + marker.validateRequest(buffer, requestId, action); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { transport.exceptionCaught(ctx, e); @@ -346,4 +328,106 @@ public void onFailure(Throwable e) { } } } + + /** + * Internal helper class to store characteristic offsets of a buffer during processing + */ + protected static final class Marker { + private final ChannelBuffer buffer; + private final int remainingMessageSize; + private final int expectedReaderIndex; + + public Marker(ChannelBuffer buffer) { + this.buffer = buffer; + // when this constructor is called, we have read already two parts of the message header: the marker bytes and the message + // message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the + // remaining message length again. + this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE); + this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize; + } + + /** + * @return the number of bytes that have yet to be read from the buffer + */ + public int messageSizeWithRemainingHeaders() { + return remainingMessageSize; + } + + /** + * @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already) + */ + public int messageSizeWithAllHeaders() { + return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE; + } + + /** + * @return the number of bytes for the message itself (excluding all headers). + */ + public int messageSize() { + return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE; + } + + /** + * @return the expected index of the buffer's reader after the message has been consumed entirely. + */ + public int expectedReaderIndex() { + return expectedReaderIndex; + } + + /** + * Validates that a request has been fully read (not too few bytes but also not too many bytes). + * + * @param stream A stream that is associated with the buffer that is tracked by this marker. + * @param requestId The current request id. + * @param action The currently executed action. + * @throws IOException Iff the stream could not be read. + * @throws IllegalStateException Iff the request has not been fully read. + */ + public void validateRequest(StreamInput stream, long requestId, String action) throws IOException { + final int nextByte = stream.read(); + // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker + if (nextByte != -1) { + throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting"); + } + if (buffer.readerIndex() < expectedReaderIndex) { + throw new IllegalStateException("Message is fully read (request), yet there are " + + (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting"); + } + if (buffer.readerIndex() > expectedReaderIndex) { + throw new IllegalStateException( + "Message read past expected size (request) for requestId [" + requestId + "], action [" + action + + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting"); + } + } + + /** + * Validates that a response has been fully read (not too few bytes but also not too many bytes). + * + * @param stream A stream that is associated with the buffer that is tracked by this marker. + * @param requestId The corresponding request id for this response. + * @param handler The current response handler. + * @param error Whether validate an error response. + * @throws IOException Iff the stream could not be read. + * @throws IllegalStateException Iff the request has not been fully read. + */ + public void validateResponse(StreamInput stream, long requestId, + TransportResponseHandler handler, boolean error) throws IOException { + // Check the entire message has been read + final int nextByte = stream.read(); + // calling read() is useful to make sure the message is fully read, even if there is an EOS marker + if (nextByte != -1) { + throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" + + handler + "], error [" + error + "]; resetting"); + } + if (buffer.readerIndex() < expectedReaderIndex) { + throw new IllegalStateException("Message is fully read (response), yet there are " + + (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting"); + } + if (buffer.readerIndex() > expectedReaderIndex) { + throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + + "], handler [" + handler + "], error [" + error + "]; resetting"); + } + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java index 0e83911a772fd..8e4423fb44785 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyHeader.java @@ -26,8 +26,17 @@ /** */ public class NettyHeader { + public static final int MARKER_BYTES_SIZE = 2 * 1; - public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4; + public static final int MESSAGE_LENGTH_SIZE = 4; + + public static final int REQUEST_ID_SIZE = 8; + + public static final int STATUS_SIZE = 1; + + public static final int VERSION_ID_SIZE = 4; + + public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE; /** * The magic number (must be lower than 0) for a ping message. This is handled @@ -56,12 +65,12 @@ public static void writeHeader(ChannelBuffer buffer, long requestId, byte status buffer.setByte(index, 'S'); index += 1; // write the size, the size indicates the remaining message size, not including the size int - buffer.setInt(index, buffer.readableBytes() - 6); - index += 4; + buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE); + index += MESSAGE_LENGTH_SIZE; buffer.setLong(index, requestId); - index += 8; + index += REQUEST_ID_SIZE; buffer.setByte(index, status); - index += 1; + index += STATUS_SIZE; buffer.setInt(index, version.id); } } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 9453eaae22298..59649bff78365 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; @@ -56,6 +57,7 @@ import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BindTransportException; @@ -242,6 +244,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem protected volatile BoundTransportAddress boundAddress; protected final KeyedLock connectionLock = new KeyedLock<>(); protected final NamedWriteableRegistry namedWriteableRegistry; + private final CircuitBreakerService circuitBreakerService; // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) @@ -252,7 +255,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Inject public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, - NamedWriteableRegistry namedWriteableRegistry) { + NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super(settings); this.threadPool = threadPool; this.networkService = networkService; @@ -288,6 +291,7 @@ public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService n threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing); } this.namedWriteableRegistry = namedWriteableRegistry; + this.circuitBreakerService = circuitBreakerService; } public Settings settings() { @@ -307,6 +311,11 @@ ThreadPool threadPool() { return threadPool; } + CircuitBreaker inFlightRequestsBreaker() { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } + @Override protected void doStart() { boolean success = false; diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index c89523074dc90..57893ff1908b1 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -39,6 +39,7 @@ import org.jboss.netty.channel.ChannelFuture; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -52,9 +53,11 @@ public class NettyTransportChannel implements TransportChannel { private final Channel channel; private final long requestId; private final String profileName; + private final long reservedBytes; + private final AtomicBoolean closed = new AtomicBoolean(); public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, - long requestId, Version version, String profileName) { + long requestId, Version version, String profileName, long reservedBytes) { this.transportServiceAdapter = transportServiceAdapter; this.version = version; this.transport = transport; @@ -62,6 +65,7 @@ public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter t this.channel = channel; this.requestId = requestId; this.profileName = profileName; + this.reservedBytes = reservedBytes; } @Override @@ -81,6 +85,7 @@ public void sendResponse(TransportResponse response) throws IOException { @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + close(); if (transport.compress) { options = TransportResponseOptions.builder(options).withCompress(transport.compress).build(); } @@ -88,9 +93,10 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op byte status = 0; status = TransportStatus.setResponse(status); - ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(transport.bigArrays); + ReleasableBytesStreamOutput bStream = null; boolean addedReleaseListener = false; try { + bStream = new ReleasableBytesStreamOutput(transport.bigArrays); bStream.skip(NettyHeader.HEADER_SIZE); StreamOutput stream = bStream; if (options.compress()) { @@ -110,7 +116,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op addedReleaseListener = true; transportServiceAdapter.onResponseSent(requestId, action, response, options); } finally { - if (!addedReleaseListener) { + if (!addedReleaseListener && bStream != null) { Releasables.close(bStream.bytes()); } } @@ -118,10 +124,11 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op @Override public void sendResponse(Throwable error) throws IOException { + close(); BytesStreamOutput stream = new BytesStreamOutput(); stream.skip(NettyHeader.HEADER_SIZE); - RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), - action, error); + RemoteTransportException tx = new RemoteTransportException( + transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); stream.writeThrowable(tx); byte status = 0; status = TransportStatus.setResponse(status); @@ -134,6 +141,14 @@ public void sendResponse(Throwable error) throws IOException { transportServiceAdapter.onResponseSent(requestId, action, error); } + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes); + } + @Override public long getRequestId() { return requestId; diff --git a/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java b/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java index aab83d293d82c..9c410e4b912b6 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java @@ -41,7 +41,8 @@ public class SizeHeaderFrameDecoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { - if (buffer.readableBytes() < 6) { + final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE; + if (buffer.readableBytes() < sizeHeaderLength) { return null; } @@ -68,11 +69,11 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe + Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")"); } - int dataLen = buffer.getInt(buffer.readerIndex() + 2); + int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE); if (dataLen == NettyHeader.PING_DATA_SIZE) { // discard the messages we read and continue, this is achieved by skipping the bytes // and returning null - buffer.skipBytes(6); + buffer.skipBytes(sizeHeaderLength); return null; } if (dataLen <= 0) { @@ -84,10 +85,10 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]"); } - if (buffer.readableBytes() < dataLen + 6) { + if (buffer.readableBytes() < dataLen + sizeHeaderLength) { return null; } - buffer.skipBytes(6); + buffer.skipBytes(sizeHeaderLength); return buffer; } @@ -121,4 +122,4 @@ public HttpOnTransportException(StreamInput in) throws IOException{ super(in); } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 7471a6b6af5a4..9153e659d0bc6 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; @@ -184,8 +185,8 @@ protected boolean accumulateExceptions() { public static class TestNode implements Releasable { public TestNode(String name, ThreadPool threadPool, Settings settings) { transportService = new TransportService(settings, - new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()), - threadPool) { + new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry(), + new NoneCircuitBreakerService()), threadPool) { @Override protected TaskManager createTaskManager() { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 9170ff2e5a621..0bd7f9bf18a75 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -40,6 +40,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -72,6 +74,7 @@ public class BroadcastReplicationTests extends ESTestCase { private static ThreadPool threadPool; + private static CircuitBreakerService circuitBreakerService; private ClusterService clusterService; private TransportService transportService; private TestBroadcastReplicationAction broadcastReplicationAction; @@ -79,13 +82,14 @@ public class BroadcastReplicationTests extends ESTestCase { @BeforeClass public static void beforeClass() { threadPool = new ThreadPool("BroadcastReplicationTests"); + circuitBreakerService = new NoneCircuitBreakerService(); } @Override @Before public void setUp() throws Exception { super.setUp(); - LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()); + LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry(), circuitBreakerService); clusterService = createClusterService(threadPool); transportService = new TransportService(transport, threadPool); transportService.start(); diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index e54dc4b587403..c263bcbcf378c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -267,4 +267,4 @@ public void close() { } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index 4d8f3e6e58d42..cbc517ceb64a3 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.http.HttpInfo; import org.elasticsearch.http.HttpServerAdapter; import org.elasticsearch.http.HttpServerTransport; @@ -43,15 +42,12 @@ import org.elasticsearch.rest.action.cat.RestNodesAction; import org.elasticsearch.rest.action.main.RestMainAction; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.Task.Status; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import static org.hamcrest.Matchers.sameInstance; - public class NetworkModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { @@ -62,7 +58,7 @@ public FakeTransportService() { static class FakeTransport extends AssertingLocalTransport { public FakeTransport() { - super(null, null, null, null); + super(null, null, null, null, null); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index bedbbbd53914e..ab834ca6ae2ec 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -30,6 +30,8 @@ import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -52,6 +54,7 @@ public class ZenFaultDetectionTests extends ESTestCase { protected ThreadPool threadPool; protected ClusterService clusterService; + private CircuitBreakerService circuitBreakerService; protected static final Version version0 = Version.fromId(/*0*/99); protected DiscoveryNode nodeA; @@ -67,6 +70,7 @@ public void setUp() throws Exception { super.setUp(); threadPool = new ThreadPool(getClass().getName()); clusterService = createClusterService(threadPool); + circuitBreakerService = new NoneCircuitBreakerService(); serviceA = build(Settings.builder().put("name", "TS_A").build(), version0); nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build(Settings.builder().put("name", "TS_B").build(), version1); @@ -112,7 +116,7 @@ public void tearDown() throws Exception { protected MockTransportService build(Settings settings, Version version) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); MockTransportService transportService = new MockTransportService(Settings.EMPTY, - new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool); + new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool); transportService.start(); transportService.acceptIncomingRequests(); return transportService; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java index e788943348c6d..00e3daf1fc81c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -57,7 +58,7 @@ public void testSimplePings() throws InterruptedException { NetworkService networkService = new NetworkService(settings); ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT); - NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); + NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); transportServiceA.acceptIncomingRequests(); final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), @@ -65,7 +66,7 @@ public void testSimplePings() throws InterruptedException { InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); + NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); transportServiceB.acceptIncomingRequests(); final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index 51e88e50edd31..a7ceec92a61b3 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -73,7 +74,8 @@ public void setUp() throws Exception { .put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false) .build(); clusterService = createClusterService(THREAD_POOL); - transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry()); + transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(), + new NoneCircuitBreakerService()); transportService = new TransportService(transport, THREAD_POOL); indicesService = getInstanceFromNode(IndicesService.class); shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL); diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index c805dd0cc69fe..ae47d2366d4ee 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -19,14 +19,24 @@ package org.elasticsearch.indices.memory.breaker; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.indices.breaker.BreakerSettings; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -40,6 +50,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -50,6 +61,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -69,6 +81,9 @@ private void reset() { .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null)) .put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0) + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), + HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null)) + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings)); } @@ -87,10 +102,13 @@ public void teardown() { private boolean noopBreakerUsed() { NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().setBreaker(true).get(); for (NodeStats nodeStats : stats) { - if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == 0) { + if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == NoopCircuitBreaker.LIMIT) { return true; } - if (nodeStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getLimit() == 0) { + if (nodeStats.getBreaker().getStats(CircuitBreaker.IN_FLIGHT_REQUESTS).getLimit() == NoopCircuitBreaker.LIMIT) { + return true; + } + if (nodeStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getLimit() == NoopCircuitBreaker.LIMIT) { return true; } } @@ -225,17 +243,19 @@ public void testParentChecking() throws Exception { fail("should have thrown an exception"); } catch (Exception e) { String errMsg = "[fielddata] Data too large, data for [test] would be larger than limit of [10/10b]"; - assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException", - e.toString().contains(errMsg), equalTo(true)); + assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", + e.toString(), containsString(errMsg)); } assertFailures(client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC), RestStatus.INTERNAL_SERVER_ERROR, containsString("Data too large, data for [test] would be larger than limit of [10/10b]")); - // Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't + // Adjust settings so the parent breaker will fail, but neither the fielddata breaker nor the node request breaker will fail + // There is no "one size fits all" breaker size as internal request size will vary based on doc count. + int parentBreakerSize = docCount * 3; resetSettings = Settings.builder() - .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "15b") + .put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), parentBreakerSize + "b") .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "90%") .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0) .build(); @@ -246,9 +266,9 @@ public void testParentChecking() throws Exception { client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get(); fail("should have thrown an exception"); } catch (Exception e) { - String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [15/15b]"; - assertThat("Exception: " +e.toString() + " should contain a CircuitBreakingException", - e.toString().contains(errMsg), equalTo(true)); + String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [" + parentBreakerSize; + assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", + e.toString(), containsString(errMsg)); } } @@ -280,8 +300,8 @@ public void testRequestBreaker() throws Exception { fail("aggregation should have tripped the breaker"); } catch (Exception e) { String errMsg = "CircuitBreakingException[[request] Data too large, data for [] would be larger than limit of [10/10b]]"; - assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException", - e.toString().contains(errMsg), equalTo(true)); + assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", + e.toString(), containsString(errMsg)); } } @@ -330,4 +350,70 @@ public void testCustomCircuitBreakerRegistration() throws Exception { } assertThat(breaks, greaterThanOrEqualTo(1)); } + + public void testLimitsRequestSize() throws Exception { + ByteSizeValue inFlightRequestsLimit = new ByteSizeValue(8, ByteSizeUnit.KB); + if (noopBreakerUsed()) { + logger.info("--> noop breakers used, skipping test"); + return; + } + + internalCluster().ensureAtLeastNumDataNodes(2); + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + List dataNodeStats = new ArrayList<>(); + for (NodeStats stat : nodeStats.getNodes()) { + if (stat.getNode().isDataNode()) { + dataNodeStats.add(stat); + } + } + + assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2)); + Collections.shuffle(dataNodeStats, random()); + + // send bulk request from source node to target node later. The sole shard is bound to the target node. + NodeStats targetNode = dataNodeStats.get(0); + NodeStats sourceNode = dataNodeStats.get(1); + + assertAcked(prepareCreate("index").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put("index.routing.allocation.include._name", targetNode.getNode().getName()) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) + )); + + Client client = client(sourceNode.getNode().getName()); + + // we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit + int numRequests = inFlightRequestsLimit.bytesAsInt(); + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numRequests; i++) { + IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i)); + indexRequest.source("field", "value", "num", i); + bulkRequest.add(indexRequest); + } + + Settings limitSettings = Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), inFlightRequestsLimit) + .build(); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(limitSettings)); + + // can either fail directly with an exception or the response contains exceptions (depending on client) + try { + BulkResponse response = client.bulk(bulkRequest).actionGet(); + if (!response.hasFailures()) { + fail("Should have thrown CircuitBreakingException"); + } else { + // each item must have failed with CircuitBreakingException + for (BulkItemResponse bulkItemResponse : response) { + Throwable cause = ExceptionsHelper.unwrapCause(bulkItemResponse.getFailure().getCause()); + assertThat(cause, instanceOf(CircuitBreakingException.class)); + assertEquals(((CircuitBreakingException) cause).getByteLimit(), inFlightRequestsLimit.bytes()); + } + } + } catch (CircuitBreakingException ex) { + assertEquals(ex.getByteLimit(), inFlightRequestsLimit.bytes()); + } + } } diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index cc1b7869a587e..0e8c27c3143a7 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -66,7 +66,7 @@ public void startThreadPool() { threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); NetworkService networkService = new NetworkService(settings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); - nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry()); + nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); nettyTransport.start(); TransportService transportService = new TransportService(nettyTransport, threadPool); nettyTransport.transportServiceAdapter(transportService.createAdapter()); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java index d587ab05e4592..1caeb1b13efe6 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.ModuleTestCase; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.threadpool.ThreadPool; @@ -34,8 +35,8 @@ public class TransportModuleTests extends ModuleTestCase { static class FakeTransport extends AssertingLocalTransport { @Inject - public FakeTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { - super(settings, threadPool, version, namedWriteableRegistry); + public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + super(settings, circuitBreakerService, threadPool, version, namedWriteableRegistry); } } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 133e13beee65d..1baa087717cc8 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -53,14 +55,16 @@ public void testScheduledPing() throws Exception { Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE.getKey(), "5ms").put(TransportSettings.PORT.getKey(), 0).build(); + CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); + NamedWriteableRegistry registryA = new NamedWriteableRegistry(); - final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA); + final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService); MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); serviceA.start(); serviceA.acceptIncomingRequests(); NamedWriteableRegistry registryB = new NamedWriteableRegistry(); - final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB); + final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService); MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); serviceB.start(); serviceB.acceptIncomingRequests(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java index dc0a533707e29..9c5ce454730ec 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; @@ -31,25 +30,20 @@ import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ActionNotFoundTransportException; -import org.elasticsearch.transport.RequestHandlerRegistry; -import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportSettings; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; @@ -106,8 +100,9 @@ public void onModule(NetworkModule module) { } @Inject - public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) { - super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry); + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + Version version, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { + super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry, circuitBreakerService); } @Override @@ -130,73 +125,20 @@ public ChannelPipeline getPipeline() throws Exception { pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) { @Override - protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { - final String action = buffer.readString(); - - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, name); - try { - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); - if (reg == null) { - throw new ActionNotFoundTransportException(action); - } - final TransportRequest request = reg.newRequest(); - request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); - request.readFrom(buffer); - String error = threadPool.getThreadContext().getHeader("ERROR"); - if (error != null) { - throw new ElasticsearchException(error); - } - if (reg.getExecutor() == ThreadPool.Names.SAME) { - //noinspection unchecked - reg.processMessageReceived(request, transportChannel); - } else { - threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); - } - } catch (Throwable e) { - try { - transportChannel.sendResponse(e); - } catch (IOException e1) { - logger.warn("Failed to send error message back to client for action [{}]", e, action); - logger.warn("Actual Exception", e1); - } - } - channelProfileName = transportChannel.getProfileName(); + protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, + int messageLengthBytes, Version version) throws IOException { + String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version); + channelProfileName = this.profileName; return action; } - class RequestHandler extends AbstractRunnable { - private final RequestHandlerRegistry reg; - private final TransportRequest request; - private final NettyTransportChannel transportChannel; - - public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) { - this.reg = reg; - this.request = request; - this.transportChannel = transportChannel; - } - - @SuppressWarnings({"unchecked"}) - @Override - protected void doRun() throws Exception { - reg.processMessageReceived(request, transportChannel); - } - - @Override - public boolean isForceExecution() { - return reg.isForceExecution(); + @Override + protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException { + super.validateRequest(marker, buffer, requestId, action); + String error = threadPool.getThreadContext().getHeader("ERROR"); + if (error != null) { + throw new ElasticsearchException(error); } - - @Override - public void onFailure(Throwable e) { - if (transport.lifecycleState() == Lifecycle.State.STARTED) { - // we can only send a response transport is started.... - try { - transportChannel.sendResponse(e); - } catch (Throwable e1) { - logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction()); - logger.warn("Actual Exception", e); - } - } } } }); return pipeline; diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java index 0ea059d46fa47..8ffd89428baf5 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java @@ -136,7 +136,8 @@ public void testThatProfileWithoutValidNameIsIgnored() throws Exception { private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) { BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); - NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, new NamedWriteableRegistry()); + NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, + new NamedWriteableRegistry(), new NoneCircuitBreakerService()); nettyTransport.start(); assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED)); diff --git a/docs/reference/modules/indices/circuit_breaker.asciidoc b/docs/reference/modules/indices/circuit_breaker.asciidoc index 1caa87f920298..762833d527b98 100644 --- a/docs/reference/modules/indices/circuit_breaker.asciidoc +++ b/docs/reference/modules/indices/circuit_breaker.asciidoc @@ -54,3 +54,24 @@ request) from exceeding a certain amount of memory. A constant that all request estimations are multiplied with to determine a final estimation. Defaults to 1 +[[in-flight-circuit-breaker]] +[float] +==== In flight requests circuit breaker + +The in flight requests circuit breaker allows Elasticsearch to limit the memory usage of all +currently active incoming requests on transport or HTTP level from exceeding a certain amount of +memory on a node. The memory usage is based on the content length of the request itself. + +`network.breaker.inflight_requests.limit`:: + + Limit for in flight requests breaker, defaults to 100% of JVM heap. This means that it is bound + by the limit configured for the parent circuit breaker. + +`network.breaker.inflight_requests.overhead`:: + + A constant that all in flight requests estimations are multiplied with to determine a + final estimation. Defaults to 1 + +[[http-circuit-breaker]] +[float] + diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index fe32a89c4b5b5..6a72d5cc1a969 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -994,7 +994,6 @@ public void beforeIndexDeletion() { assertShardIndexCounter(); //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); - } private void assertSameSyncIdSameDocs() { @@ -1852,6 +1851,7 @@ public void run() { @Override public void assertAfterTest() throws IOException { super.assertAfterTest(); + assertRequestsFinished(); for (NodeEnvironment env : this.getInstances(NodeEnvironment.class)) { Set shardIds = env.lockedShards(); for (ShardId id : shardIds) { @@ -1864,6 +1864,27 @@ public void assertAfterTest() throws IOException { } } + private void assertRequestsFinished() { + if (size() > 0) { + for (NodeAndClient nodeAndClient : nodes.values()) { + CircuitBreaker inFlightRequestsBreaker = getInstance(HierarchyCircuitBreakerService.class, nodeAndClient.name) + .getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + try { + // see #ensureEstimatedStats() + assertBusy(() -> { + // ensure that our size accounting on transport level is reset properly + long bytesUsed = inFlightRequestsBreaker.getUsed(); + assertThat("All incoming requests on node [" + nodeAndClient.name + "] should have finished. Expected 0 but got " + + bytesUsed, bytesUsed, equalTo(0L)); + }); + } catch (Exception e) { + logger.error("Could not assert finished requests within timeout", e); + fail("Could not assert finished requests within timeout on node [" + nodeAndClient.name + "]"); + } + } + } + } + /** * Simple interface that allows to wait for an async operation to finish * @param the result of the async execution diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java index 322882a7b3c58..1e18a160f4a03 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.VersionUtils; @@ -79,8 +80,8 @@ public void onModule(SettingsModule module) { private final Version maxVersion; @Inject - public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { - super(settings, threadPool, version, namedWriteableRegistry); + public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + super(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService); final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings); random = new Random(seed); minVersion = ASSERTING_TRANSPORT_MIN_VERSION_KEY.get(settings); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f5fd51238477f..20d9a22867543 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.tasks.MockTaskManager; @@ -98,14 +99,14 @@ public Settings additionalSettings() { public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); - Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry); + Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry, new NoneCircuitBreakerService()); return new MockTransportService(settings, transport, threadPool); } public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, - version, namedWriteableRegistry); + version, namedWriteableRegistry, new NoneCircuitBreakerService()); return new MockTransportService(Settings.EMPTY, transport, threadPool); } From 117bc68af3987004ade7e56f0aebace72cd1d0ef Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 13 Apr 2016 09:58:08 +0200 Subject: [PATCH 2/2] Limit request size on HTTP level With this commit we limit the size of all in-flight requests on HTTP level. The size is guarded by the same circuit breaker that is also used on transport level. Similarly, the size that is used is HTTP content length. Relates #16011 --- .../breaker/CircuitBreakingException.java | 6 + .../org/elasticsearch/http/HttpServer.java | 92 +++++- .../http/netty/HttpRequestHandler.java | 7 +- .../http/netty/NettyHttpChannel.java | 18 +- .../elasticsearch/rest/RestController.java | 22 +- .../elasticsearch/http/HttpServerTests.java | 262 ++++++++++++++++++ .../http/netty/NettyHttpChannelTests.java | 6 +- .../http/netty/NettyHttpClient.java | 43 ++- .../netty/NettyHttpRequestSizeLimitIT.java | 98 +++++++ .../netty/NettyHttpServerPipeliningTests.java | 8 +- .../http/netty/NettyPipeliningDisabledIT.java | 2 +- .../http/netty/NettyPipeliningEnabledIT.java | 4 +- .../rest/RestControllerTests.java | 2 +- .../rest/RestFilterChainTests.java | 70 +---- .../action/cat/RestRecoveryActionTests.java | 1 - .../test/rest/FakeRestChannel.java | 91 ++++++ .../test/rest/FakeRestRequest.java | 11 +- 17 files changed, 628 insertions(+), 115 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/http/HttpServerTests.java create mode 100644 core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java create mode 100644 test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index 8347e54d4bd24..e700d30164480 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -66,6 +67,11 @@ public long getByteLimit() { return this.byteLimit; } + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } + @Override protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("bytes_wanted", bytesWanted); diff --git a/core/src/main/java/org/elasticsearch/http/HttpServer.java b/core/src/main/java/org/elasticsearch/http/HttpServer.java index 5ca565e4c88e1..45abad0fb8186 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpServer.java +++ b/core/src/main/java/org/elasticsearch/http/HttpServer.java @@ -19,22 +19,29 @@ package org.elasticsearch.http; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.env.Environment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.rest.RestStatus.FORBIDDEN; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -43,24 +50,22 @@ * A component to serve http requests, backed by rest handlers. */ public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter { - - private final Environment environment; - private final HttpServerTransport transport; private final RestController restController; private final NodeService nodeService; + private final CircuitBreakerService circuitBreakerService; + @Inject - public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, - RestController restController, - NodeService nodeService) { + public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService, + CircuitBreakerService circuitBreakerService) { super(settings); - this.environment = environment; this.transport = transport; this.restController = restController; this.nodeService = nodeService; + this.circuitBreakerService = circuitBreakerService; nodeService.setHttpServer(this); transport.httpServerAdapter(this); } @@ -99,7 +104,15 @@ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadCont handleFavicon(request, channel); return; } - restController.dispatchRequest(request, channel, threadContext); + RestChannel responseChannel = channel; + try { + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), ""); + // iff we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService); + restController.dispatchRequest(request, responseChannel, threadContext); + } catch (Throwable t) { + restController.sendErrorResponse(request, responseChannel, t); + } } void handleFavicon(RestRequest request, RestChannel channel) { @@ -118,4 +131,65 @@ void handleFavicon(RestRequest request, RestChannel channel) { channel.sendResponse(new BytesRestResponse(FORBIDDEN)); } } + + private static final class ResourceHandlingHttpChannel implements RestChannel { + private final RestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final AtomicBoolean closed = new AtomicBoolean(); + + public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return delegate.newBuilder(autoDetectSource, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + + @Override + public void sendResponse(RestResponse response) { + close(); + delegate.sendResponse(response); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length()); + } + + } + + private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } } diff --git a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java index c140a3be6de5e..376ca738fabb5 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java @@ -61,11 +61,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumalation buffer NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel()); - if (oue != null) { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled)); - } else { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, detailedErrorsEnabled)); - } + NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled); + serverTransport.dispatchRequest(httpRequest, channel); super.messageReceived(ctx, e); } diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java index cd45d259293d8..a634db247aa53 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.netty; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; @@ -58,19 +59,22 @@ public final class NettyHttpChannel extends AbstractRestChannel { private final NettyHttpServerTransport transport; private final Channel channel; private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest; - private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null; - + private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent; + + /** + * @param transport The corresponding NettyHttpServerTransport where this channel belongs to. + * @param request The request that is handled by this channel. + * @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if + * HTTP pipelining is disabled. + * @param detailedErrorsEnabled true iff error messages should include stack traces. + */ public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, + @Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { super(request, detailedErrorsEnabled); this.transport = transport; this.channel = request.getChannel(); this.nettyRequest = request.request(); - } - - public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, - OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { - this(transport, request, detailedErrorsEnabled); this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent; } diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index 0cbfdd0ef1beb..6da1a929f7c83 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -158,11 +158,11 @@ public RestFilterChain filterChain(RestFilter executionFilter) { return new ControllerFilterChain(executionFilter); } - public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) { + public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } - try (ThreadContext.StoredContext t = threadContext.stashContext()){ + try (ThreadContext.StoredContext t = threadContext.stashContext()) { for (String key : relevantHeaders) { String httpHeader = request.header(key); if (httpHeader != null) { @@ -170,15 +170,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel } } if (filters.length == 0) { - try { - executeHandler(request, channel); - } catch (Throwable e) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (Throwable e1) { - logger.error("failed to send failure response for uri [{}]", e1, request.uri()); - } - } + executeHandler(request, channel); } else { ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter); filterChain.continueProcessing(request, channel); @@ -186,6 +178,14 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel } } + public void sendErrorResponse(RestRequest request, RestChannel channel, Throwable e) { + try { + channel.sendResponse(new BytesRestResponse(channel, e)); + } catch (Throwable e1) { + logger.error("failed to send failure response for uri [{}]", e1, request.uri()); + } + } + /** * Checks the request parameters against enabled settings for error trace support * @return true if the request does not have any parameters that conflict with system settings diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java new file mode 100644 index 0000000000000..28fc315d3e14a --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.ByteBufferBytesReference; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class HttpServerTests extends ESTestCase { + private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20); + private HttpServer httpServer; + private CircuitBreaker inFlightRequestsBreaker; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService( + Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + // we can do this here only because we know that we don't adjust breaker settings dynamically in the test + inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + + HttpServerTransport httpServerTransport = new TestHttpServerTransport(); + RestController restController = new RestController(settings); + restController.registerHandler(RestRequest.Method.GET, "/", + (request, channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK))); + restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel) -> { + throw new IllegalArgumentException("test error"); + }); + + ClusterService clusterService = new ClusterService(Settings.EMPTY, null, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, ClusterName.DEFAULT); + NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, null, + clusterService, null); + httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService); + httpServer.start(); + } + + public void testDispatchRequestAddsAndFreesBytesOnSuccess() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/error", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + // we will produce an error in the rest handler and one more when sending the error response + TestRestRequest request = new TestRestRequest("/error", content); + ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestLimitsBytes() { + int contentLength = BREAKER_LIMIT.bytesAsInt() + 1; + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(1, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements + HttpServerTransport { + + public TestHttpServerTransport() { + super(Settings.EMPTY); + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public BoundTransportAddress boundAddress() { + LocalTransportAddress transportAddress = new LocalTransportAddress("1"); + return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress); + } + + @Override + public HttpInfo info() { + return null; + } + + @Override + public HttpStats stats() { + return null; + } + + @Override + public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { + + } + } + + private static final class AssertingChannel extends AbstractRestChannel { + private final RestStatus expectedStatus; + + protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) { + super(request, detailedErrorsEnabled); + this.expectedStatus = expectedStatus; + } + + @Override + public void sendResponse(RestResponse response) { + assertEquals(expectedStatus, response.status()); + } + } + + private static final class ExceptionThrowingChannel extends AbstractRestChannel { + + protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) { + super(request, detailedErrorsEnabled); + } + + @Override + public void sendResponse(RestResponse response) { + throw new IllegalStateException("always throwing an exception for testing"); + } + } + + private static final class TestRestRequest extends RestRequest { + private final String path; + private final BytesReference content; + + private TestRestRequest(String path, String content) { + this.path = path; + this.content = new ByteBufferBytesReference(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return null; + } + + @Override + public String rawPath() { + return path; + } + + @Override + public boolean hasContent() { + return true; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public String header(String name) { + return null; + } + + @Override + public Iterable> headers() { + return null; + } + + @Override + public boolean hasParam(String key) { + return false; + } + + @Override + public String param(String key) { + return null; + } + + @Override + public String param(String key, String defaultValue) { + return null; + } + + @Override + public Map params() { + return null; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java index 883caf06a0013..ce9051ad18935 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java @@ -342,6 +342,8 @@ private static class TestHttpRequest implements HttpRequest { private HttpHeaders headers = new DefaultHttpHeaders(); + private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER; + @Override public HttpMethod getMethod() { return null; @@ -379,12 +381,12 @@ public HttpHeaders headers() { @Override public ChannelBuffer getContent() { - return ChannelBuffers.EMPTY_BUFFER; + return content; } @Override public void setContent(ChannelBuffer content) { - + this.content = content; } @Override diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java index 139e1a0647da3..4b04a6259c009 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java @@ -18,13 +18,17 @@ */ package org.elasticsearch.http.netty; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -33,6 +37,7 @@ import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -76,9 +81,34 @@ public NettyHttpClient() { clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());; } - public synchronized Collection sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(uris.length); - final Collection content = Collections.synchronizedList(new ArrayList(uris.length)); + public Collection get(SocketAddress remoteAddress, String... uris) throws InterruptedException { + Collection requests = new ArrayList<>(uris.length); + for (int i = 0; i < uris.length; i++) { + final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); + requests.add(httpRequest); + } + return sendRequests(remoteAddress, requests); + } + + public Collection post(SocketAddress remoteAddress, Tuple... urisAndBodies) throws InterruptedException { + Collection requests = new ArrayList<>(urisAndBodies.length); + for (Tuple uriAndBody : urisAndBodies) { + ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8); + HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1()); + request.headers().add(HOST, "localhost"); + request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes()); + request.setContent(content); + requests.add(request); + } + return sendRequests(remoteAddress, requests); + } + + private synchronized Collection sendRequests(SocketAddress remoteAddress, Collection requests) + throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(requests.size()); + final Collection content = Collections.synchronizedList(new ArrayList<>(requests.size())); clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content)); @@ -87,11 +117,8 @@ public synchronized Collection sendRequests(SocketAddress remoteAd channelFuture = clientBootstrap.connect(remoteAddress); channelFuture.await(1000); - for (int i = 0; i < uris.length; i++) { - final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); - httpRequest.headers().add(HOST, "localhost"); - httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - channelFuture.getChannel().write(httpRequest); + for (HttpRequest request : requests) { + channelFuture.getChannel().write(request); } latch.await(); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java new file mode 100644 index 0000000000000..71c0bcf338195 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collection; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase { + private static final ByteSizeValue LIMIT = new ByteSizeValue(1, ByteSizeUnit.KB); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) + .build(); + } + + public void testLimitsInFlightRequests() throws Exception { + ensureGreen(); + + // we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit + int numRequests = LIMIT.bytesAsInt() / 50; + + StringBuilder bulkRequest = new StringBuilder(); + for (int i = 0; i < numRequests; i++) { + bulkRequest.append("{\"index\": {}}"); + bulkRequest.append(System.lineSeparator()); + bulkRequest.append("{ \"field\" : \"value\" }"); + bulkRequest.append(System.lineSeparator()); + } + + Tuple[] requests = new Tuple[] { + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest) + }; + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress + ().boundAddresses()); + + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection singleResponse = nettyHttpClient.post(inetSocketTransportAddress.address(), requests[0]); + assertThat(singleResponse, hasSize(1)); + assertAtLeastOnceExpectedStatus(singleResponse, HttpResponseStatus.OK); + + @SuppressWarnings("unchecked") + Collection multipleResponses = nettyHttpClient.post(inetSocketTransportAddress.address(), requests); + assertThat(multipleResponses, hasSize(requests.length)); + assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.SERVICE_UNAVAILABLE); + } + } + + private void assertAtLeastOnceExpectedStatus(Collection responses, HttpResponseStatus expectedStatus) { + long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count(); + assertThat(countResponseErrors, greaterThan(0L)); + + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java index e63d45734ee27..ece00173627f4 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java @@ -101,7 +101,7 @@ public void testThatHttpPipeliningWorksWhenEnabled() throws Exception { List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); Collection responseBodies = returnHttpResponseBodies(responses); assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast")); } @@ -118,7 +118,7 @@ public void testThatHttpPipeliningCanBeDisabled() throws Exception { List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); List responseBodies = new ArrayList<>(returnHttpResponseBodies(responses)); // we cannot be sure about the order of the fast requests, but the slow ones should have to be last assertThat(responseBodies, hasSize(5)); @@ -132,7 +132,9 @@ class CustomNettyHttpServerTransport extends NettyHttpServerTransport { private final ExecutorService executorService; public CustomNettyHttpServerTransport(Settings settings) { - super(settings, NettyHttpServerPipeliningTests.this.networkService, NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool); + super(settings, NettyHttpServerPipeliningTests.this.networkService, + NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool + ); this.executorService = Executors.newFixedThreadPool(5); } diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java index 0097ddf2c24f6..9420f1de928f6 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java @@ -55,7 +55,7 @@ public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(requests.size())); List opaqueIds = new ArrayList<>(returnOpaqueIds(responses)); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java index b2f1b8cb592fc..1eccb94679721 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java @@ -51,7 +51,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(5)); Collection opaqueIds = returnOpaqueIds(responses); @@ -68,4 +68,4 @@ private void assertOpaqueIdsInOrder(Collection opaqueIds) { } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java index d6e1a97ac8fd3..101d2d8e50dd6 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -69,7 +69,7 @@ public void testRegisterRelevantHeaders() throws InterruptedException { assertThat(relevantHeaders, equalTo(headersArray)); } - public void testApplyRelevantHeaders() { + public void testApplyRelevantHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); final RestController restController = new RestController(Settings.EMPTY) { @Override diff --git a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java index eeb3d6fbbd75c..51f36d1e25f2b 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java @@ -19,30 +19,25 @@ package org.elasticsearch.rest; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; public class RestFilterChainTests extends ESTestCase { - public void testRestFilters() throws InterruptedException { + public void testRestFilters() throws Exception { RestController restController = new RestController(Settings.EMPTY); @@ -84,7 +79,7 @@ public void handleRequest(RestRequest request, RestChannel channel) throws Excep }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); assertThat(fakeRestChannel.await(), equalTo(true)); @@ -118,7 +113,7 @@ public int compare(TestFilter o1, TestFilter o2) { } } - public void testTooManyContinueProcessing() throws InterruptedException { + public void testTooManyContinueProcessing() throws Exception { final int additionalContinueCount = randomInt(10); @@ -142,65 +137,14 @@ public void handleRequest(RestRequest request, RestChannel channel) throws Excep }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), additionalContinueCount + 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); fakeRestChannel.await(); assertThat(testFilter.runs.get(), equalTo(1)); - assertThat(fakeRestChannel.responses.get(), equalTo(1)); - assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount)); - } - - private static class FakeRestChannel extends AbstractRestChannel { - - private final CountDownLatch latch; - AtomicInteger responses = new AtomicInteger(); - AtomicInteger errors = new AtomicInteger(); - - protected FakeRestChannel(RestRequest request, int responseCount) { - super(request, randomBoolean()); - this.latch = new CountDownLatch(responseCount); - } - - @Override - public XContentBuilder newBuilder() throws IOException { - return super.newBuilder(); - } - - @Override - public XContentBuilder newErrorBuilder() throws IOException { - return super.newErrorBuilder(); - } - - @Override - public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { - return super.newBuilder(autoDetectSource, useFiltering); - } - - @Override - protected BytesStreamOutput newBytesOutput() { - return super.newBytesOutput(); - } - - @Override - public RestRequest request() { - return super.request(); - } - - @Override - public void sendResponse(RestResponse response) { - if (response.status() == RestStatus.OK) { - responses.incrementAndGet(); - } else { - errors.incrementAndGet(); - } - latch.countDown(); - } - - public boolean await() throws InterruptedException { - return latch.await(10, TimeUnit.SECONDS); - } + assertThat(fakeRestChannel.responses().get(), equalTo(1)); + assertThat(fakeRestChannel.errors().get(), equalTo(additionalContinueCount)); } private static enum Operation implements Callback { diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java index 848c62ab2b4a5..4e8ea3b3eb00c 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java new file mode 100644 index 0000000000000..3d1ce29143208 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.rest; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public final class FakeRestChannel extends AbstractRestChannel { + private final CountDownLatch latch; + private final AtomicInteger responses = new AtomicInteger(); + private final AtomicInteger errors = new AtomicInteger(); + + public FakeRestChannel(RestRequest request, boolean detailedErrorsEnabled, int responseCount) { + super(request, detailedErrorsEnabled); + this.latch = new CountDownLatch(responseCount); + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return super.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return super.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return super.newBuilder(autoDetectSource, useFiltering); + } + + @Override + protected BytesStreamOutput newBytesOutput() { + return super.newBytesOutput(); + } + + @Override + public RestRequest request() { + return super.request(); + } + + @Override + public void sendResponse(RestResponse response) { + if (response.status() == RestStatus.OK) { + responses.incrementAndGet(); + } else { + errors.incrementAndGet(); + } + latch.countDown(); + } + + public boolean await() throws InterruptedException { + return latch.await(10, TimeUnit.SECONDS); + } + + public AtomicInteger responses() { + return responses; + } + + public AtomicInteger errors() { + return errors; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 14075b5254b8c..66167bc7ec89a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -31,6 +31,8 @@ public class FakeRestRequest extends RestRequest { private final Map params; + private final BytesReference content; + public FakeRestRequest() { this(new HashMap<>()); } @@ -40,8 +42,13 @@ public FakeRestRequest(Map headers) { } public FakeRestRequest(Map headers, Map params) { + this(headers, params, null); + } + + public FakeRestRequest(Map headers, Map params, BytesReference content) { this.headers = headers; this.params = params; + this.content = content; } @Override @@ -61,12 +68,12 @@ public String rawPath() { @Override public boolean hasContent() { - return false; + return content != null; } @Override public BytesReference content() { - return null; + return content; } @Override