diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 467b2c7f3c8fd..25525de7fbf46 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -218,10 +218,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil } } + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + throw new ConnectTransportException(node, "DISCONNECT: simulated"); + } + @Override protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { - simulateDisconnect(connection, original, "DISCONNECT: simulated"); + connection.close(); + // send the request, which will blow up + connection.sendRequest(requestId, action, request, options); } }); } @@ -256,19 +263,12 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final addDelegate(transportAddress, new DelegateTransport(original) { - @Override - public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, - CheckedBiConsumer connectionValidator) - throws ConnectTransportException { - original.connectToNode(node, connectionProfile, connectionValidator); - } - @Override protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { if (blockedActions.contains(action)) { logger.info("--> preventing {} request", action); - simulateDisconnect(connection, original, "DISCONNECT: prevented " + action + " request"); + connection.close(); } connection.sendRequest(requestId, action, request, options); } @@ -302,6 +302,11 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil } } + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + @Override protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { @@ -368,6 +373,28 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil } } + @Override + public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + TimeValue delay = getDelay(); + if (delay.millis() <= 0) { + return original.openConnection(node, profile); + } + + // TODO: Replace with proper setting + TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY); + try { + if (delay.millis() < connectingTimeout.millis()) { + Thread.sleep(delay.millis()); + return original.openConnection(node, profile); + } else { + Thread.sleep(connectingTimeout.millis()); + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + } catch (InterruptedException e) { + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + } + @Override protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException { @@ -449,37 +476,6 @@ private LookupTestTransport transport() { return (LookupTestTransport) transport; } - /** - * simulates a disconnect by disconnecting from the underlying transport and throwing a - * {@link ConnectTransportException} - */ - private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason) { - simulateDisconnect(node, transport, reason, null); - } - - /** - * simulates a disconnect by disconnecting from the underlying transport and throwing a - * {@link ConnectTransportException}, due to a specific cause exception - */ - private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason, @Nullable Throwable e) { - if (transport.nodeConnected(node)) { - // this a connected node, disconnecting from it will be up the exception - transport.disconnectFromNode(node); - } else { - throw new ConnectTransportException(node, reason, e); - } - } - - /** - * simulates a disconnect by closing the connection and throwing a - * {@link ConnectTransportException} - */ - private void simulateDisconnect(Transport.Connection connection, Transport transport, String reason) throws IOException { - connection.close(); - simulateDisconnect(connection.getNode(), transport, reason); - } - - /** * A lookup transport that has a list of potential Transport implementations to delegate to for node operations, * if none is registered, then the default one is used. diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 206cfeeb62ecd..7c0070e0f967f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.node.Node; @@ -61,7 +60,6 @@ import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1463,12 +1461,7 @@ public void handleException(TransportException exp) { // all is well } - try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) { - serviceB.handshake(connection, 100); - fail("exception should be thrown"); - } catch (IllegalStateException e) { - // all is well - } + expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)); } public void testMockUnresponsiveRule() throws IOException { @@ -1519,12 +1512,7 @@ public void handleException(TransportException exp) { // all is well } - try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) { - serviceB.handshake(connection, 100); - fail("exception should be thrown"); - } catch (IllegalStateException e) { - // all is well - } + expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)); }