diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 853e6986d02b6..204eab7aaf4e9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -54,7 +54,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) { - super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest::new); + super(settings, ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, ClusterHealthRequest::new); this.clusterName = clusterName; this.gatewayAllocator = gatewayAllocator; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 4bd826237c5d9..77a42108280ae 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -47,7 +47,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction, Response extends ActionResponse> extends TransportAction { - protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { + protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier request) { + this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + } + + protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker, + new TransportHandler()); } class TransportHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index ee082610d934f..f0ea4ea14f298 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -58,7 +58,15 @@ public abstract class TransportMasterNodeAction request) { - super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request); + } + + protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker, + TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier request) { + super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver, + request); this.transportService = transportService; this.clusterService = clusterService; this.executor = executor(); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java index 5a6ddcfb34efe..ceb8e1a40ac88 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java @@ -46,7 +46,14 @@ public abstract class TransportMasterNodeReadAction request) { - super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request); + this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request); + } + + protected TransportMasterNodeReadAction(Settings settings, String actionName, boolean checkSizeLimit, TransportService transportService, + ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { + super(settings, actionName, checkSizeLimit, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver,request); this.forceLocal = FORCE_LOCAL_SETTING.get(settings); } diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 2767bc80bf3df..39d6bf767e194 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -80,7 +80,8 @@ protected TransportNodesAction(Settings settings, String actionName, ClusterName this.transportNodeAction = actionName + "[n]"; - transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler()); + transportService.registerRequestHandler( + transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler()); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2cc647ddd5a4e..8a721dfe50868 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -92,8 +92,9 @@ public abstract class TransportReplicationAction { private final String action; private final TransportRequestHandler handler; private final boolean forceExecution; + private final boolean canTripCircuitBreaker; private final String executor; private final Supplier requestFactory; private final TaskManager taskManager; public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, - TransportRequestHandler handler, String executor, boolean forceExecution) { + TransportRequestHandler handler, String executor, boolean forceExecution, + boolean canTripCircuitBreaker) { this.action = action; this.requestFactory = requestFactory; assert newRequest() != null; this.handler = handler; this.forceExecution = forceExecution; + this.canTripCircuitBreaker = canTripCircuitBreaker; this.executor = executor; this.taskManager = taskManager; } @@ -77,6 +80,10 @@ public boolean isForceExecution() { return forceExecution; } + public boolean canTripCircuitBreaker() { + return canTripCircuitBreaker; + } + public String getExecutor() { return executor; } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 820cdfc130aee..f53d63740ffe3 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -580,23 +580,27 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi */ public void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestFactory, taskManager, handler, executor, false, true); registerRequestHandler(reg); } /** * Registers a new request handler * - * @param action The action the request handler is associated with - * @param request The request class that will be used to constrcut new instances for streaming - * @param executor The executor the request handling will be executed on - * @param forceExecution Force execution on the executor queue and never reject it - * @param handler The handler itself that implements the request handling + * @param action The action the request handler is associated with + * @param request The request class that will be used to constrcut new instances for streaming + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param handler The handler itself that implements the request handling */ public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, + boolean canTripCircuitBreaker, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); registerRequestHandler(reg); } diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 1c224e1aeef76..d1ac749e4acff 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -297,12 +297,16 @@ private void handleRequest(StreamInput stream, long requestId, int messageLength Version version) throws Exception { stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry); final String action = stream.readString(); + final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); transportServiceAdapter.onRequestReceived(requestId, action); - inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + if (reg != null && reg.canTripCircuitBreaker()) { + inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } else { + inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes); + } final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version, messageLengthBytes); try { - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 0cd47b093add0..e45635e3349a2 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -226,14 +226,17 @@ protected String handleRequest(Channel channel, Marker marker, StreamInput buffe transportServiceAdapter.onRequestReceived(requestId, action); NettyTransportChannel transportChannel = null; try { - transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); - transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, - requestId, version, profileName, messageLengthBytes); - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException(action); } + if (reg.canTripCircuitBreaker()) { + transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } else { + transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes); + } + transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, + requestId, version, profileName, messageLengthBytes); final TransportRequest request = reg.newRequest(); request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); request.readFrom(buffer); diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index e5199048c35f3..1108e8335e932 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -20,40 +20,53 @@ package org.elasticsearch.discovery; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService; import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class ZenFaultDetectionTests extends ESTestCase { protected ThreadPool threadPool; - protected ClusterService clusterService; + protected ClusterService clusterServiceA; + protected ClusterService clusterServiceB; private CircuitBreakerService circuitBreakerService; protected static final Version version0 = Version.fromId(/*0*/99); @@ -68,9 +81,14 @@ public class ZenFaultDetectionTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); + Settings settings = Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), new ByteSizeValue(0)) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); threadPool = new ThreadPool(getClass().getName()); - clusterService = createClusterService(threadPool); - circuitBreakerService = new NoneCircuitBreakerService(); + clusterServiceA = createClusterService(threadPool); + clusterServiceB = createClusterService(threadPool); + circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings); serviceA = build(Settings.builder().put("name", "TS_A").build(), version0); nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build(Settings.builder().put("name", "TS_B").build(), version1); @@ -109,7 +127,8 @@ public void tearDown() throws Exception { super.tearDown(); serviceA.close(); serviceB.close(); - clusterService.close(); + clusterServiceA.close(); + clusterServiceB.close(); terminate(threadPool); } @@ -117,7 +136,10 @@ protected MockTransportService build(Settings settings, Version version) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); MockTransportService transportService = new MockTransportService( - Settings.EMPTY, + Settings.builder() + // trace zenfd actions but keep the default otherwise + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME)) + .build(), new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool, ClusterName.DEFAULT); @@ -183,6 +205,9 @@ public void onNodeFailure(DiscoveryNode node, String reason) { serviceB.stop(); notified.await(30, TimeUnit.SECONDS); + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertEquals(nodeB, failureNode[0]); Matcher matcher = Matchers.containsString("verified"); if (!shouldRetry) { @@ -200,9 +225,9 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m"); ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); - setState(clusterService, state); + setState(clusterServiceA, state); MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, - clusterService); + clusterServiceA); masterFD.start(nodeB, "test"); final String[] failureReason = new String[1]; @@ -217,6 +242,9 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce serviceB.stop(); notified.await(30, TimeUnit.SECONDS); + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertEquals(nodeB, failureNode[0]); Matcher matcher = Matchers.containsString("verified"); if (!shouldRetry) { @@ -225,4 +253,79 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce assertThat(failureReason[0], matcher); } + + public void testMasterFaultDetectionNotSizeLimited() throws InterruptedException { + Settings.Builder settings = Settings.builder(); + boolean shouldRetry = randomBoolean(); + settings + .put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry) + .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "1s"); + ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); + final ClusterState stateNodeA = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); + setState(clusterServiceA, stateNodeA); + + int minExpectedPings = 2; + + PingProbe pingProbeA = new PingProbe(minExpectedPings); + PingProbe pingProbeB = new PingProbe(minExpectedPings); + + serviceA.addTracer(pingProbeA); + serviceB.addTracer(pingProbeB); + + MasterFaultDetection masterFDNodeA = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, + clusterServiceA); + masterFDNodeA.start(nodeB, "test"); + + final ClusterState stateNodeB = ClusterState.builder(clusterName).nodes(buildNodesForB(true)).build(); + setState(clusterServiceB, stateNodeB); + + MasterFaultDetection masterFDNodeB = new MasterFaultDetection(settings.build(), threadPool, serviceB, clusterName, + clusterServiceB); + masterFDNodeB.start(nodeB, "test"); + + // let's do a few pings + pingProbeA.awaitMinCompletedPings(); + pingProbeB.awaitMinCompletedPings(); + + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertThat(pingProbeA.completedPings(), greaterThanOrEqualTo(minExpectedPings)); + assertThat(pingProbeB.completedPings(), greaterThanOrEqualTo(minExpectedPings)); + } + + private static class PingProbe extends MockTransportService.Tracer { + private final Set> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final CountDownLatch waitForPings; + + public PingProbe(int minCompletedPings) { + this.waitForPings = new CountDownLatch(minCompletedPings); + } + + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { + inflightPings.add(Tuple.tuple(node, requestId)); + } + } + + @Override + public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { + if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { + Tuple ping = Tuple.tuple(sourceNode, requestId); + if (inflightPings.remove(ping)) { + completedPings.add(ping); + waitForPings.countDown(); + } + } + } + + public int completedPings() { + return completedPings.size(); + } + + public void awaitMinCompletedPings() throws InterruptedException { + waitForPings.await(); + } + } } diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index c2bc877b903a0..c0dfcb69eade6 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -743,8 +743,8 @@ synchronized void clearInterceptedActions() { } @Override - public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { - super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler<>(action, handler)); + public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler handler) { + super.registerRequestHandler(action, request, executor, forceExecution, canTripCircuitBreaker, new InterceptingRequestHandler<>(action, handler)); } @Override