Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
}

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
throw new ConnectTransportException(node, "DISCONNECT: simulated");
}

@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
simulateDisconnect(connection, original, "DISCONNECT: simulated");
connection.close();
// send the request, which will blow up
connection.sendRequest(requestId, action, request, options);
}
});
}
Expand Down Expand Up @@ -256,19 +263,12 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final

addDelegate(transportAddress, new DelegateTransport(original) {

@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
original.connectToNode(node, connectionProfile, connectionValidator);
}

@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
simulateDisconnect(connection, original, "DISCONNECT: prevented " + action + " request");
connection.close();
}
connection.sendRequest(requestId, action, request, options);
}
Expand Down Expand Up @@ -302,6 +302,11 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
}

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}

@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
Expand Down Expand Up @@ -368,6 +373,28 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
}

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
return original.openConnection(node, profile);
}

// TODO: Replace with proper setting
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
return original.openConnection(node, profile);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
}

@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
Expand Down Expand Up @@ -449,37 +476,6 @@ private LookupTestTransport transport() {
return (LookupTestTransport) transport;
}

/**
* simulates a disconnect by disconnecting from the underlying transport and throwing a
* {@link ConnectTransportException}
*/
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason) {
simulateDisconnect(node, transport, reason, null);
}

/**
* simulates a disconnect by disconnecting from the underlying transport and throwing a
* {@link ConnectTransportException}, due to a specific cause exception
*/
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason, @Nullable Throwable e) {
if (transport.nodeConnected(node)) {
// this a connected node, disconnecting from it will be up the exception
transport.disconnectFromNode(node);
} else {
throw new ConnectTransportException(node, reason, e);
}
}

/**
* simulates a disconnect by closing the connection and throwing a
* {@link ConnectTransportException}
*/
private void simulateDisconnect(Transport.Connection connection, Transport transport, String reason) throws IOException {
connection.close();
simulateDisconnect(connection.getNode(), transport, reason);
}


/**
* A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
* if none is registered, then the default one is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.node.Node;
Expand All @@ -61,7 +60,6 @@
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1463,12 +1461,7 @@ public void handleException(TransportException exp) {
// all is well
}

try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well
}
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
}

public void testMockUnresponsiveRule() throws IOException {
Expand Down Expand Up @@ -1519,12 +1512,7 @@ public void handleException(TransportException exp) {
// all is well
}

try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)) {
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well
}
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
}


Expand Down