From 6bf4b6daf791b612adc86acf4622a926d61ee080 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 8 Jun 2017 09:22:07 +0200 Subject: [PATCH 1/2] Break out clear scroll logic from TransportClearScrollAction This change extracts the main logic from `TransportClearScrollAction` into a new class `ClearScrollController` and adds a corresponding unittest. Relates to #25094 --- .../action/search/ClearScrollController.java | 141 ++++++++++ .../action/search/SearchTransportService.java | 12 +- .../search/TransportClearScrollAction.java | 111 +------- .../search/ClearScrollControllerTests.java | 251 ++++++++++++++++++ .../action/search/SearchAsyncActionTests.java | 2 +- 5 files changed, 403 insertions(+), 114 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java new file mode 100644 index 0000000000000..d94fe1a2bbe6b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportResponse; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; + +final class ClearScrollController implements Runnable { + private final DiscoveryNodes nodes; + private final SearchTransportService searchTransportService; + private final CountDown expectedOps; + private final ActionListener listener; + private final AtomicBoolean hasFailed = new AtomicBoolean(false); + private final AtomicInteger freedSearchContexts = new AtomicInteger(0); + private final Logger logger; + private final Runnable runner; + + ClearScrollController(ClearScrollRequest request, ActionListener listener, DiscoveryNodes nodes, Logger logger, + SearchTransportService searchTransportService) { + this.nodes = nodes; + this.logger = logger; + this.searchTransportService = searchTransportService; + this.listener = listener; + List scrollIds = request.getScrollIds(); + final int expectedOps; + if (scrollIds.size() == 1 && "_all".equals(scrollIds.get(0))) { + expectedOps = nodes.getSize(); + runner = this::cleanAllScrolls; + } else { + List parsedScrollIds = new ArrayList<>(); + for (String parsedScrollId : request.getScrollIds()) { + ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext(); + for (ScrollIdForNode id : context) { + parsedScrollIds.add(id); + } + } + if (parsedScrollIds.isEmpty()) { + expectedOps = 0; + runner = () -> listener.onResponse(new ClearScrollResponse(true, 0)); + } else { + expectedOps = parsedScrollIds.size(); + runner = () -> cleanScrollIds(parsedScrollIds); + } + } + this.expectedOps = new CountDown(expectedOps); + + } + + @Override + public void run() { + runner.run(); + } + + void cleanAllScrolls() { + for (final DiscoveryNode node : nodes) { + try { + Transport.Connection connection = searchTransportService.getConnection(null, node); + searchTransportService.sendClearAllScrollContexts(connection, new ActionListener() { + @Override + public void onResponse(TransportResponse response) { + onFreedContext(true); + } + + @Override + public void onFailure(Exception e) { + onFailedFreedContext(e, node); + } + }); + } catch (Exception e) { + onFailedFreedContext(e, node); + } + } + } + + void cleanScrollIds(List parsedScrollIds) { + for (ScrollIdForNode target : parsedScrollIds) { + final DiscoveryNode node = nodes.get(target.getNode()); + if (node == null) { + onFreedContext(false); + } else { + try { + Transport.Connection connection = searchTransportService.getConnection(null, node); + searchTransportService.sendFreeContext(connection, target.getScrollId(), + ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), + e -> onFailedFreedContext(e, node))); + } catch (Exception e) { + onFailedFreedContext(e, node); + } + } + } + } + + private void onFreedContext(boolean freed) { + if (freed) { + freedSearchContexts.incrementAndGet(); + } + if (expectedOps.countDown()) { + boolean succeeded = hasFailed.get() == false; + listener.onResponse(new ClearScrollResponse(succeeded, freedSearchContexts.get())); + } + } + + private void onFailedFreedContext(Throwable e, DiscoveryNode node) { + logger.warn((Supplier) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e); + if (expectedOps.countDown()) { + listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get())); + } else { + hasFailed.set(true); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 9dd2125d5e2fe..2d20d383288f4 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -98,14 +98,14 @@ public void onFailure(Exception e) { }, SearchFreeContextResponse::new)); } - public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) { - transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), - new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); + public void sendFreeContext(Transport.Connection connection, long contextId, final ActionListener listener) { + transportService.sendRequest(connection, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); } - public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { - transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, - new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { + transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 716077c915d6b..d9afbdacafe3c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -19,30 +19,16 @@ package org.elasticsearch.action.search; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; - public class TransportClearScrollAction extends HandledTransportAction { private final ClusterService clusterService; @@ -53,105 +39,16 @@ public TransportClearScrollAction(Settings settings, TransportService transportS ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, SearchTransportService searchTransportService) { - super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new); + super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + ClearScrollRequest::new); this.clusterService = clusterService; this.searchTransportService = searchTransportService; } @Override protected void doExecute(ClearScrollRequest request, final ActionListener listener) { - new Async(request, listener, clusterService.state()).run(); - } - - private class Async { - final DiscoveryNodes nodes; - final CountDown expectedOps; - final List contexts = new ArrayList<>(); - final ActionListener listener; - final AtomicReference expHolder; - final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0); - - private Async(ClearScrollRequest request, ActionListener listener, ClusterState clusterState) { - int expectedOps = 0; - this.nodes = clusterState.nodes(); - if (request.getScrollIds().size() == 1 && "_all".equals(request.getScrollIds().get(0))) { - expectedOps = nodes.getSize(); - } else { - for (String parsedScrollId : request.getScrollIds()) { - ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext(); - expectedOps += context.length; - this.contexts.add(context); - } - } - this.listener = listener; - this.expHolder = new AtomicReference<>(); - this.expectedOps = new CountDown(expectedOps); - } - - public void run() { - if (expectedOps.isCountedDown()) { - listener.onResponse(new ClearScrollResponse(true, 0)); - return; - } - - if (contexts.isEmpty()) { - for (final DiscoveryNode node : nodes) { - searchTransportService.sendClearAllScrollContexts(node, new ActionListener() { - @Override - public void onResponse(TransportResponse response) { - onFreedContext(true); - } - - @Override - public void onFailure(Exception e) { - onFailedFreedContext(e, node); - } - }); - } - } else { - for (ScrollIdForNode[] context : contexts) { - for (ScrollIdForNode target : context) { - final DiscoveryNode node = nodes.get(target.getNode()); - if (node == null) { - onFreedContext(false); - continue; - } - - searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener() { - @Override - public void onResponse(SearchTransportService.SearchFreeContextResponse freed) { - onFreedContext(freed.isFreed()); - } - - @Override - public void onFailure(Exception e) { - onFailedFreedContext(e, node); - } - }); - } - } - } - } - - void onFreedContext(boolean freed) { - if (freed) { - numberOfFreedSearchContexts.incrementAndGet(); - } - if (expectedOps.countDown()) { - boolean succeeded = expHolder.get() == null; - listener.onResponse(new ClearScrollResponse(succeeded, numberOfFreedSearchContexts.get())); - } - } - - void onFailedFreedContext(Throwable e, DiscoveryNode node) { - logger.warn((Supplier) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e); - if (expectedOps.countDown()) { - listener.onResponse(new ClearScrollResponse(false, numberOfFreedSearchContexts.get())); - } else { - expHolder.set(e); - } - } - + Runnable runnable = new ClearScrollController(request, listener, clusterService.state().nodes(), logger, searchTransportService); + runnable.run(); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java new file mode 100644 index 0000000000000..e97da9b248377 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -0,0 +1,251 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class ClearScrollControllerTests extends ESTestCase { + + public void testClearAll() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(3, clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + @Override + public void sendClearAllScrollContexts(DiscoveryNode node, ActionListener listener) { + nodesInvoked.add(node); + Thread t = new Thread(() -> listener.onResponse(TransportResponse.Empty.INSTANCE)); // response is unused + t.start(); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList("_all")); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3, nodesInvoked.size()); + Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId)); + assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3)); + } + + + public void testClearScrollIds() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + AtomicInteger numFreed = new AtomicInteger(0); + String scrollId = TransportSearchHelper.buildScrollId(array); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); + } finally { + latch.countDown(); + } + + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + + @Override + public void sendFreeContext(Transport.Connection connection, long contextId, + ActionListener listener) { + nodesInvoked.add(connection.getNode()); + boolean freed = randomBoolean(); + if (freed) { + numFreed.incrementAndGet(); + } + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(freed))); + t.start(); + } + + @Override + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList(scrollId)); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3, nodesInvoked.size()); + Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId)); + assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3)); + } + + public void testClearScrollIdsWithFailure() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + AtomicInteger numFreed = new AtomicInteger(0); + AtomicInteger numFailures = new AtomicInteger(0); + AtomicInteger numConnectionFailures = new AtomicInteger(0); + String scrollId = TransportSearchHelper.buildScrollId(array); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + if (numFailures.get() > 0) { + assertFalse(clearScrollResponse.isSucceeded()); + } else { + assertTrue(clearScrollResponse.isSucceeded()); + } + + } finally { + latch.countDown(); + } + + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + + @Override + public void sendFreeContext(Transport.Connection connection, long contextId, + ActionListener listener) { + nodesInvoked.add(connection.getNode()); + boolean freed = randomBoolean(); + boolean fail = randomBoolean(); + Thread t = new Thread(() -> { + if (fail) { + numFailures.incrementAndGet(); + listener.onFailure(new IllegalArgumentException("boom")); + } else { + if (freed) { + numFreed.incrementAndGet(); + } + listener.onResponse(new SearchFreeContextResponse(freed)); + } + }); + t.start(); + } + + @Override + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + if (randomBoolean()) { + numFailures.incrementAndGet(); + numConnectionFailures.incrementAndGet(); + throw new NodeNotConnectedException(node, "boom"); + } + return new SearchAsyncActionTests.MockConnection(node); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList(scrollId)); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3 - numConnectionFailures.get(), nodesInvoked.size()); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 39890038f2a2a..878cb7e61266b 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -214,7 +214,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - public final class MockConnection implements Transport.Connection { + public static final class MockConnection implements Transport.Connection { private final DiscoveryNode node; From cd72490024841331539a726b5ea36c3909bb75b2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 8 Jun 2017 09:36:55 +0200 Subject: [PATCH 2/2] fix compilation issue --- .../action/search/ClearScrollControllerTests.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index e97da9b248377..5037ffe03f962 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -72,11 +72,16 @@ public void onFailure(Exception e) { List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { @Override - public void sendClearAllScrollContexts(DiscoveryNode node, ActionListener listener) { - nodesInvoked.add(node); + public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener listener) { + nodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(TransportResponse.Empty.INSTANCE)); // response is unused t.start(); } + + @Override + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } }; ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.scrollIds(Arrays.asList("_all"));