From 57b81b1d54238c307d2a335a18e18dfdf77fac29 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Aug 2020 08:23:08 +0200 Subject: [PATCH 1/4] Fix Blackholed Connection Behavior in DisruptableMockTransport It is not realistic to drop messages without eventually failing. To retain the coverage of long pauses this PR adjusts the blockholed behavior to fail a send after 24h (which is assumed to be longer than any timeout in the system) instead of never. Closes #61034 --- .../elasticsearch/snapshots/SnapshotResiliencyTests.java | 2 +- .../coordination/AbstractCoordinatorTestCase.java | 2 +- .../test/disruption/DisruptableMockTransport.java | 9 ++++++++- .../test/disruption/DisruptableMockTransportTests.java | 4 ++-- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9829b54165f2a..bf70d88b3a072 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1417,7 +1417,7 @@ protected void connectToNodesAndWait(ClusterState newClusterState) { } }); recoverySettings = new RecoverySettings(settings, clusterSettings); - mockTransport = new DisruptableMockTransport(node, logger) { + mockTransport = new DisruptableMockTransport(node, logger, deterministicTaskQueue) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { if (node.equals(destination)) { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 539dff1bc78e3..94712386614e8 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -929,7 +929,7 @@ class ClusterNode { private void setUp() { final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode); - mockTransport = new DisruptableMockTransport(localNode, logger) { + mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) { @Override protected void execute(Runnable runnable) { deterministicTaskQueue.scheduleNow(onNode(runnable)); diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index eae0024729c0c..7b1ab203a21ac 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.ClusterSettings; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.elasticsearch.test.ESTestCase.copyWriteable; @@ -51,10 +53,12 @@ public abstract class DisruptableMockTransport extends MockTransport { private final DiscoveryNode localNode; private final Logger logger; + private final DeterministicTaskQueue deterministicTaskQueue; - public DisruptableMockTransport(DiscoveryNode localNode, Logger logger) { + public DisruptableMockTransport(DiscoveryNode localNode, Logger logger, DeterministicTaskQueue deterministicTaskQueue) { this.localNode = localNode; this.logger = logger; + this.deterministicTaskQueue = deterministicTaskQueue; } protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode destination); @@ -159,6 +163,9 @@ protected String getRequestDescription(long requestId, String action, DiscoveryN protected void onBlackholedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) { logger.trace("dropping {}", getRequestDescription(requestId, action, destinationTransport.getLocalNode())); + // Delaying the request for one day and then disconnect to simulate a very long pause + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + TimeUnit.DAYS.toMillis(1L), + () -> onDisconnectedDuringSend(requestId, action, destinationTransport)); } protected void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) { diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 20bb0c09dc382..11e6de1222217 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -103,7 +103,7 @@ public void initTransports() { deterministicTaskQueue = new DeterministicTaskQueue( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random()); - final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) { + final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger, deterministicTaskQueue) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); @@ -120,7 +120,7 @@ protected void execute(Runnable runnable) { } }; - final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) { + final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger, deterministicTaskQueue) { @Override protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) { return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination); From d0b3d1fe029e6cae5d822bc0ca46febc22d49bd2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Aug 2020 13:03:21 +0200 Subject: [PATCH 2/4] CR: fail responses as well --- .../test/disruption/DisruptableMockTransport.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index 7b1ab203a21ac..a14957e3ea2fa 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -205,8 +205,12 @@ public void run() { break; case BLACK_HOLE: + logger.trace("blackholed during response to {}: channel is {}", requestDescription, connectionStatus); + onBlackholedDuringSend(requestId, action, destinationTransport); + break; case DISCONNECTED: - logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus); + logger.trace("disconnected during response to {}: channel is {}", requestDescription, connectionStatus); + onDisconnectedDuringSend(requestId, action, destinationTransport); break; default: @@ -235,8 +239,14 @@ public void run() { break; case BLACK_HOLE: + logger.trace("blackholed during exception response to {}: channel is {}", + requestDescription, connectionStatus); + onBlackholedDuringSend(requestId, action, destinationTransport); + break; case DISCONNECTED: - logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus); + logger.trace("disconnected during exception response to {}: channel is {}", + requestDescription, connectionStatus); + onDisconnectedDuringSend(requestId, action, destinationTransport); break; default: From 6f4526b0a72371b02451cc46f114dc3de096016b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Aug 2020 13:56:16 +0200 Subject: [PATCH 3/4] fix test --- .../disruption/DisruptableMockTransportTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 11e6de1222217..1b158c5563c76 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -57,6 +57,7 @@ import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.instanceOf; public class DisruptableMockTransportTests extends ESTestCase { @@ -318,13 +319,16 @@ public void testDisconnectedOnSuccessfulResponse() throws IOException { AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - send(service1, node2, responseHandlerShouldNotBeCalled()); + AtomicReference responseHandlerException = new AtomicReference<>(); + send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set)); deterministicTaskQueue.runAllRunnableTasks(); assertNotNull(responseHandlerChannel.get()); + assertNull(responseHandlerException.get()); disconnectedLinks.add(Tuple.tuple(node2, node1)); responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE); deterministicTaskQueue.runAllRunnableTasks(); + assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class)); } public void testDisconnectedOnExceptionalResponse() throws IOException { @@ -332,13 +336,16 @@ public void testDisconnectedOnExceptionalResponse() throws IOException { AtomicReference responseHandlerChannel = new AtomicReference<>(); registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set)); - send(service1, node2, responseHandlerShouldNotBeCalled()); + AtomicReference responseHandlerException = new AtomicReference<>(); + send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set)); deterministicTaskQueue.runAllRunnableTasks(); assertNotNull(responseHandlerChannel.get()); + assertNull(responseHandlerException.get()); disconnectedLinks.add(Tuple.tuple(node2, node1)); responseHandlerChannel.get().sendResponse(new Exception()); deterministicTaskQueue.runAllRunnableTasks(); + assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class)); } public void testUnavailableOnSuccessfulResponse() throws IOException { From b181920133aecbb0cb3173c13dba2b258af06858 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 20 Aug 2020 15:16:14 +0200 Subject: [PATCH 4/4] CR: delay it all --- .../test/disruption/DisruptableMockTransport.java | 15 ++++----------- .../disruption/DisruptableMockTransportTests.java | 4 ++-- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index a14957e3ea2fa..a3a46ca20a459 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -205,12 +205,9 @@ public void run() { break; case BLACK_HOLE: - logger.trace("blackholed during response to {}: channel is {}", requestDescription, connectionStatus); - onBlackholedDuringSend(requestId, action, destinationTransport); - break; case DISCONNECTED: - logger.trace("disconnected during response to {}: channel is {}", requestDescription, connectionStatus); - onDisconnectedDuringSend(requestId, action, destinationTransport); + logger.trace("delaying response to {}: channel is {}", requestDescription, connectionStatus); + onBlackholedDuringSend(requestId, action, destinationTransport); break; default: @@ -239,14 +236,10 @@ public void run() { break; case BLACK_HOLE: - logger.trace("blackholed during exception response to {}: channel is {}", - requestDescription, connectionStatus); - onBlackholedDuringSend(requestId, action, destinationTransport); - break; case DISCONNECTED: - logger.trace("disconnected during exception response to {}: channel is {}", + logger.trace("delaying exception response to {}: channel is {}", requestDescription, connectionStatus); - onDisconnectedDuringSend(requestId, action, destinationTransport); + onBlackholedDuringSend(requestId, action, destinationTransport); break; default: diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 1b158c5563c76..5f7f2085a2c40 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -327,7 +327,7 @@ public void testDisconnectedOnSuccessfulResponse() throws IOException { disconnectedLinks.add(Tuple.tuple(node2, node1)); responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE); - deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.runAllTasks(); assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class)); } @@ -344,7 +344,7 @@ public void testDisconnectedOnExceptionalResponse() throws IOException { disconnectedLinks.add(Tuple.tuple(node2, node1)); responseHandlerChannel.get().sendResponse(new Exception()); - deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.runAllTasks(); assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class)); }