Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/java/org/elasticsearch/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadCont
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
if (restController.canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, threadContext);
Expand Down
59 changes: 30 additions & 29 deletions core/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,14 @@ public synchronized void registerFilter(RestFilter preProcessor) {
}

/**
* Registers a rest handler to be execute when the provided method and path match the request.
* Registers a rest handler to be executed when the provided method and path match the request.
*/
public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
switch (method) {
case GET:
getHandlers.insert(path, handler);
break;
case DELETE:
deleteHandlers.insert(path, handler);
break;
case POST:
postHandlers.insert(path, handler);
break;
case PUT:
putHandlers.insert(path, handler);
break;
case OPTIONS:
optionsHandlers.insert(path, handler);
break;
case HEAD:
headHandlers.insert(path, handler);
break;
default:
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
PathTrie<RestHandler> handlers = getHandlersForMethod(method);
if (handlers != null) {
handlers.insert(path, handler);
} else {
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
}
}

Expand All @@ -159,6 +143,15 @@ public RestFilterChain filterChain(RestFilter executionFilter) {
return new ControllerFilterChain(executionFilter);
}

/**
* @param request The current request. Must not be null.
* @return true iff the circuit breaker limit must be enforced for processing this request.
*/
public boolean canTripCircuitBreaker(RestRequest request) {
RestHandler handler = getHandler(request);
return (handler != null) ? handler.canTripCircuitBreaker() : true;
}

public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception {
if (!checkRequestParameters(request, channel)) {
return;
Expand Down Expand Up @@ -226,19 +219,27 @@ void executeHandler(RestRequest request, RestChannel channel) throws Exception {

private RestHandler getHandler(RestRequest request) {
String path = getPath(request);
RestRequest.Method method = request.method();
PathTrie<RestHandler> handlers = getHandlersForMethod(request.method());
if (handlers != null) {
return handlers.retrieve(path, request.params());
} else {
return null;
}
}

private PathTrie<RestHandler> getHandlersForMethod(RestRequest.Method method) {
if (method == RestRequest.Method.GET) {
return getHandlers.retrieve(path, request.params());
return getHandlers;
} else if (method == RestRequest.Method.POST) {
return postHandlers.retrieve(path, request.params());
return postHandlers;
} else if (method == RestRequest.Method.PUT) {
return putHandlers.retrieve(path, request.params());
return putHandlers;
} else if (method == RestRequest.Method.DELETE) {
return deleteHandlers.retrieve(path, request.params());
return deleteHandlers;
} else if (method == RestRequest.Method.HEAD) {
return headHandlers.retrieve(path, request.params());
return headHandlers;
} else if (method == RestRequest.Method.OPTIONS) {
return optionsHandlers.retrieve(path, request.params());
return optionsHandlers;
} else {
return null;
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@
public interface RestHandler {

void handleRequest(RestRequest request, RestChannel channel) throws Exception;
}

default boolean canTripCircuitBreaker() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<ClusterHealthResponse>(channel));
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public RestResponse buildResponse(NodesHotThreadsResponse response) throws Excep
}
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,

client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,

client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
ActionListener<CancelTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@ public void onFailure(Throwable e) {
}
};
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public RestResponse buildResponse(ClusterStateResponse response, XContentBuilder
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}

private XContentBuilder renderResponse(ClusterState state, boolean renderDefaults, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ protected void addCustomFields(XContentBuilder builder, ClusterUpdateSettingsRes
}
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public RestResponse buildResponse(ClusterStateResponse response, XContentBuilder
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}

static final class Fields {
static final String CLUSTER_NAME = "cluster_name";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
clusterStatsRequest.timeout(request.param("timeout"));
client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public RestResponse buildResponse(ClearIndicesCacheResponse response, XContentBu
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}

public static ClearIndicesCacheRequest fromRequest(final RestRequest request, ClearIndicesCacheRequest clearIndicesCacheRequest, ParseFieldMatcher parseFieldMatcher) {

for (Map.Entry<String, String> entry : request.params().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ public RestResponse buildResponse(IndicesStatsResponse response, XContentBuilder
}
});
}

@Override
public boolean canTripCircuitBreaker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ private static RestRequest toRestRequest(ClusterRerouteRequest original) throws
}
builder.endObject();

return new FakeRestRequest(emptyMap(), params, hasBody ? builder.bytes() : null);
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder();
requestBuilder.withParams(params);
if (hasBody) {
requestBuilder.withContent(builder.bytes());
}
return requestBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,22 @@ public Collection<HttpResponse> get(SocketAddress remoteAddress, String... uris)
@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
public final Collection<HttpResponse> post(SocketAddress remoteAddress, Tuple<String, CharSequence>... urisAndBodies)
throws InterruptedException {
return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
}

@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
public final Collection<HttpResponse> put(SocketAddress remoteAddress, Tuple<String, CharSequence>... urisAndBodies)
throws InterruptedException {
return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
}

@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
private final Collection<HttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, Tuple<String,
CharSequence>... urisAndBodies) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.length);
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1());
HttpRequest request = new DefaultHttpRequest(HTTP_1_1, method, uriAndBody.v1());
request.headers().add(HOST, "localhost");
request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes());
request.setContent(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.util.Collection;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -89,9 +89,35 @@ public void testLimitsInFlightRequests() throws Exception {
}
}

public void testDoesNotLimitExcludedRequests() throws Exception {
ensureGreen();

@SuppressWarnings("unchecked")
Tuple<String, CharSequence>[] requestUris = new Tuple[1500];
for (int i = 0; i < requestUris.length; i++) {
requestUris[i] = Tuple.tuple("/_cluster/settings",
"{ \"transient\": {\"indices.ttl.interval\": \"40s\" } }");
}

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress
().boundAddresses());

try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.put(inetSocketTransportAddress.address(), requestUris);
assertThat(responses, hasSize(requestUris.length));
assertAllInExpectedStatus(responses, HttpResponseStatus.OK);
}
}

private void assertAtLeastOnceExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count();
assertThat(countResponseErrors, greaterThan(0L));
long countExpectedStatus = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count();
assertThat("Expected at least one request with status [" + expectedStatus + "]", countExpectedStatus, greaterThan(0L));
}

private void assertAllInExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
long countUnexpectedStatus = responses.stream().filter(r -> r.getStatus().equals(expectedStatus) == false).count();
assertThat("Expected all requests with status [" + expectedStatus + "] but [" + countUnexpectedStatus +
"] requests had a different one", countUnexpectedStatus, equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,39 @@ void executeHandler(RestRequest request, RestChannel channel) throws Exception {
restHeaders.put("header.1", "true");
restHeaders.put("header.2", "true");
restHeaders.put("header.3", "false");
restController.dispatchRequest(new FakeRestRequest(restHeaders), null, threadContext);
restController.dispatchRequest(new FakeRestRequest.Builder().withHeaders(restHeaders).build(), null, threadContext);
assertNull(threadContext.getHeader("header.1"));
assertNull(threadContext.getHeader("header.2"));
assertEquals("true", threadContext.getHeader("header.3"));
}

public void testCanTripCircuitBreaker() throws Exception {
RestController controller = new RestController(Settings.EMPTY);
// trip circuit breaker by default
controller.registerHandler(RestRequest.Method.GET, "/trip", new FakeRestHandler(true));
controller.registerHandler(RestRequest.Method.GET, "/do-not-trip", new FakeRestHandler(false));

assertTrue(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/trip").build()));
// assume trip even on unknown paths
assertTrue(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/unknown-path").build()));
assertFalse(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/do-not-trip").build()));
}

private static class FakeRestHandler implements RestHandler {
private final boolean canTripCircuitBreaker;

private FakeRestHandler(boolean canTripCircuitBreaker) {
this.canTripCircuitBreaker = canTripCircuitBreaker;
}

@Override
public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
//no op
}

@Override
public boolean canTripCircuitBreaker() {
return canTripCircuitBreaker;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -82,7 +81,7 @@ public void testGetResponse() throws Exception {
if (prettyPrint == false) {
params.put("pretty", String.valueOf(prettyPrint));
}
RestRequest restRequest = new FakeRestRequest(Collections.emptyMap(), params);
RestRequest restRequest = new FakeRestRequest.Builder().withParams(params).build();

BytesRestResponse response = RestMainAction.convertMainResponse(mainResponse, restRequest, builder);
assertNotNull(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testThatDisplayHeadersWithoutTimestamp() throws Exception {
}

private RestResponse assertResponseContentType(Map<String, String> headers, String mediaType) throws Exception {
FakeRestRequest requestWithAcceptHeader = new FakeRestRequest(headers);
FakeRestRequest requestWithAcceptHeader = new FakeRestRequest.Builder().withHeaders(headers).build();
table.startRow();
table.addCell("foo");
table.addCell("foo");
Expand Down
Loading