From ddbfda2c68e6be7067bc8d9f779c3e75289eb99c Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Fri, 13 May 2016 14:21:24 +0200 Subject: [PATCH] Exclude specific transport actions from request size limit check We add support to explicitly exclude specific transport actions from the request size limit check. We also exclude the following request types currently: *MasterPingRequest * PingRequest --- .../health/TransportClusterHealthAction.java | 3 +- .../state/TransportClusterStateAction.java | 2 +- .../support/HandledTransportAction.java | 13 +- .../master/TransportMasterNodeAction.java | 10 +- .../master/TransportMasterNodeReadAction.java | 9 +- .../support/nodes/TransportNodesAction.java | 3 +- .../TransportReplicationAction.java | 7 +- .../discovery/zen/ZenDiscovery.java | 3 +- .../zen/fd/MasterFaultDetection.java | 3 +- .../discovery/zen/fd/NodesFaultDetection.java | 3 +- .../transport/RequestHandlerRegistry.java | 9 +- .../transport/TransportService.java | 18 +-- .../transport/local/LocalTransport.java | 8 +- .../netty/MessageChannelHandler.java | 11 +- .../discovery/ZenFaultDetectionTests.java | 119 ++++++++++++++++-- .../messy/tests/IndicesRequestTests.java | 4 +- 16 files changed, 188 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 853e6986d02b6..204eab7aaf4e9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -54,7 +54,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) { - super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest::new); + super(settings, ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, ClusterHealthRequest::new); this.clusterName = clusterName; this.gatewayAllocator = gatewayAllocator; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 4bd826237c5d9..77a42108280ae 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -47,7 +47,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction, Response extends ActionResponse> extends TransportAction { - protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { + protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier request) { + this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + } + + protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker, + new TransportHandler()); } class TransportHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index ee082610d934f..f0ea4ea14f298 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -58,7 +58,15 @@ public abstract class TransportMasterNodeAction request) { - super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request); + } + + protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker, + TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Supplier request) { + super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver, + request); this.transportService = transportService; this.clusterService = clusterService; this.executor = executor(); diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java index 5a6ddcfb34efe..ceb8e1a40ac88 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeReadAction.java @@ -46,7 +46,14 @@ public abstract class TransportMasterNodeReadAction request) { - super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request); + this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request); + } + + protected TransportMasterNodeReadAction(Settings settings, String actionName, boolean checkSizeLimit, TransportService transportService, + ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { + super(settings, actionName, checkSizeLimit, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver,request); this.forceLocal = FORCE_LOCAL_SETTING.get(settings); } diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 2767bc80bf3df..39d6bf767e194 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -80,7 +80,8 @@ protected TransportNodesAction(Settings settings, String actionName, ClusterName this.transportNodeAction = actionName + "[n]"; - transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler()); + transportService.registerRequestHandler( + transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler()); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2cc647ddd5a4e..8a721dfe50868 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -92,8 +92,9 @@ public abstract class TransportReplicationAction { private final String action; private final TransportRequestHandler handler; private final boolean forceExecution; + private final boolean canTripCircuitBreaker; private final String executor; private final Supplier requestFactory; private final TaskManager taskManager; public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, - TransportRequestHandler handler, String executor, boolean forceExecution) { + TransportRequestHandler handler, String executor, boolean forceExecution, + boolean canTripCircuitBreaker) { this.action = action; this.requestFactory = requestFactory; assert newRequest() != null; this.handler = handler; this.forceExecution = forceExecution; + this.canTripCircuitBreaker = canTripCircuitBreaker; this.executor = executor; this.taskManager = taskManager; } @@ -77,6 +80,10 @@ public boolean isForceExecution() { return forceExecution; } + public boolean canTripCircuitBreaker() { + return canTripCircuitBreaker; + } + public String getExecutor() { return executor; } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 820cdfc130aee..f53d63740ffe3 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -580,23 +580,27 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi */ public void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestFactory, taskManager, handler, executor, false, true); registerRequestHandler(reg); } /** * Registers a new request handler * - * @param action The action the request handler is associated with - * @param request The request class that will be used to constrcut new instances for streaming - * @param executor The executor the request handling will be executed on - * @param forceExecution Force execution on the executor queue and never reject it - * @param handler The handler itself that implements the request handling + * @param action The action the request handler is associated with + * @param request The request class that will be used to constrcut new instances for streaming + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param handler The handler itself that implements the request handling */ public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, + boolean canTripCircuitBreaker, TransportRequestHandler handler) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); registerRequestHandler(reg); } diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 1c224e1aeef76..d1ac749e4acff 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -297,12 +297,16 @@ private void handleRequest(StreamInput stream, long requestId, int messageLength Version version) throws Exception { stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry); final String action = stream.readString(); + final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); transportServiceAdapter.onRequestReceived(requestId, action); - inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + if (reg != null && reg.canTripCircuitBreaker()) { + inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } else { + inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes); + } final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version, messageLengthBytes); try { - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException("Action [" + action + "] not found"); } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 0cd47b093add0..e45635e3349a2 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -226,14 +226,17 @@ protected String handleRequest(Channel channel, Marker marker, StreamInput buffe transportServiceAdapter.onRequestReceived(requestId, action); NettyTransportChannel transportChannel = null; try { - transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); - transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, - requestId, version, profileName, messageLengthBytes); - final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { throw new ActionNotFoundTransportException(action); } + if (reg.canTripCircuitBreaker()) { + transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, ""); + } else { + transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes); + } + transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, + requestId, version, profileName, messageLengthBytes); final TransportRequest request = reg.newRequest(); request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); request.readFrom(buffer); diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java index e5199048c35f3..1108e8335e932 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java @@ -20,40 +20,53 @@ package org.elasticsearch.discovery; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.discovery.zen.fd.FaultDetection; import org.elasticsearch.discovery.zen.fd.MasterFaultDetection; import org.elasticsearch.discovery.zen.fd.NodesFaultDetection; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportConnectionListener; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService; import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class ZenFaultDetectionTests extends ESTestCase { protected ThreadPool threadPool; - protected ClusterService clusterService; + protected ClusterService clusterServiceA; + protected ClusterService clusterServiceB; private CircuitBreakerService circuitBreakerService; protected static final Version version0 = Version.fromId(/*0*/99); @@ -68,9 +81,14 @@ public class ZenFaultDetectionTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); + Settings settings = Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), new ByteSizeValue(0)) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); threadPool = new ThreadPool(getClass().getName()); - clusterService = createClusterService(threadPool); - circuitBreakerService = new NoneCircuitBreakerService(); + clusterServiceA = createClusterService(threadPool); + clusterServiceB = createClusterService(threadPool); + circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings); serviceA = build(Settings.builder().put("name", "TS_A").build(), version0); nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build(Settings.builder().put("name", "TS_B").build(), version1); @@ -109,7 +127,8 @@ public void tearDown() throws Exception { super.tearDown(); serviceA.close(); serviceB.close(); - clusterService.close(); + clusterServiceA.close(); + clusterServiceB.close(); terminate(threadPool); } @@ -117,7 +136,10 @@ protected MockTransportService build(Settings settings, Version version) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(); MockTransportService transportService = new MockTransportService( - Settings.EMPTY, + Settings.builder() + // trace zenfd actions but keep the default otherwise + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME)) + .build(), new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool, ClusterName.DEFAULT); @@ -183,6 +205,9 @@ public void onNodeFailure(DiscoveryNode node, String reason) { serviceB.stop(); notified.await(30, TimeUnit.SECONDS); + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertEquals(nodeB, failureNode[0]); Matcher matcher = Matchers.containsString("verified"); if (!shouldRetry) { @@ -200,9 +225,9 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m"); ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); - setState(clusterService, state); + setState(clusterServiceA, state); MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, - clusterService); + clusterServiceA); masterFD.start(nodeB, "test"); final String[] failureReason = new String[1]; @@ -217,6 +242,9 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce serviceB.stop(); notified.await(30, TimeUnit.SECONDS); + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertEquals(nodeB, failureNode[0]); Matcher matcher = Matchers.containsString("verified"); if (!shouldRetry) { @@ -225,4 +253,79 @@ public void testMasterFaultDetectionConnectOnDisconnect() throws InterruptedExce assertThat(failureReason[0], matcher); } + + public void testMasterFaultDetectionNotSizeLimited() throws InterruptedException { + Settings.Builder settings = Settings.builder(); + boolean shouldRetry = randomBoolean(); + settings + .put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry) + .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "1s"); + ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20)); + final ClusterState stateNodeA = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build(); + setState(clusterServiceA, stateNodeA); + + int minExpectedPings = 2; + + PingProbe pingProbeA = new PingProbe(minExpectedPings); + PingProbe pingProbeB = new PingProbe(minExpectedPings); + + serviceA.addTracer(pingProbeA); + serviceB.addTracer(pingProbeB); + + MasterFaultDetection masterFDNodeA = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName, + clusterServiceA); + masterFDNodeA.start(nodeB, "test"); + + final ClusterState stateNodeB = ClusterState.builder(clusterName).nodes(buildNodesForB(true)).build(); + setState(clusterServiceB, stateNodeB); + + MasterFaultDetection masterFDNodeB = new MasterFaultDetection(settings.build(), threadPool, serviceB, clusterName, + clusterServiceB); + masterFDNodeB.start(nodeB, "test"); + + // let's do a few pings + pingProbeA.awaitMinCompletedPings(); + pingProbeB.awaitMinCompletedPings(); + + CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L)); + assertThat(pingProbeA.completedPings(), greaterThanOrEqualTo(minExpectedPings)); + assertThat(pingProbeB.completedPings(), greaterThanOrEqualTo(minExpectedPings)); + } + + private static class PingProbe extends MockTransportService.Tracer { + private final Set> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final CountDownLatch waitForPings; + + public PingProbe(int minCompletedPings) { + this.waitForPings = new CountDownLatch(minCompletedPings); + } + + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { + inflightPings.add(Tuple.tuple(node, requestId)); + } + } + + @Override + public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) { + if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) { + Tuple ping = Tuple.tuple(sourceNode, requestId); + if (inflightPings.remove(ping)) { + completedPings.add(ping); + waitForPings.countDown(); + } + } + } + + public int completedPings() { + return completedPings.size(); + } + + public void awaitMinCompletedPings() throws InterruptedException { + waitForPings.await(); + } + } } diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index c2bc877b903a0..c0dfcb69eade6 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -743,8 +743,8 @@ synchronized void clearInterceptedActions() { } @Override - public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { - super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler<>(action, handler)); + public void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler handler) { + super.registerRequestHandler(action, request, executor, forceExecution, canTripCircuitBreaker, new InterceptingRequestHandler<>(action, handler)); } @Override