Skip to content

Commit c651c89

Browse files
committed
Update MockTransportService to the age of Transport.Connection (#25320)
MockTransportServices allows us to simulate network disruptions in our testing infra. Sadly it wasn't updated to the state of the art in Transport land. This PR brings it up to speed. Specifically: 1) Opening a connection is now also blocked (before only node connections were blocked) 2) Simplifies things using the latest connection based notification between TcpTransport and TransportService for when a disconnect happens. 3) By 2, it fixes a race condition where we may fail to respond to a sent request when it is sent concurrently with the closing of a connection. The old code relied on a node based bridge between tcp transport and transport service. Sadly, the following doesn't work any more: ``` if (transport.nodeConnected(node)) { // this a connected node, disconnecting from it will be up the exception transport.disconnectFromNode(node); <-- this may now be a noop and it doesn't mean that the transport service was notified of the disconnect between the nodeConnected check and here. } else { throw new ConnectTransportException(node, reason, e); } ```
1 parent 2468f18 commit c651c89

File tree

3 files changed

+39
-54
lines changed

3 files changed

+39
-54
lines changed

core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.transport.ActionNotFoundTransportException;
4848
import org.elasticsearch.transport.ConnectTransportException;
4949
import org.elasticsearch.transport.ConnectionProfile;
50-
import org.elasticsearch.transport.NodeDisconnectedException;
5150
import org.elasticsearch.transport.NodeNotConnectedException;
5251
import org.elasticsearch.transport.RemoteTransportException;
5352
import org.elasticsearch.transport.RequestHandlerRegistry;
@@ -253,6 +252,7 @@ public Object getCacheKey() {
253252
@Override
254253
public void close() throws IOException {
255254
transportServiceAdapter.onConnectionClosed(this);
255+
LocalTransport.this.disconnectFromNode(node);
256256
}
257257
};
258258
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.common.util.BigArrays;
4040
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4141
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
42-
import org.elasticsearch.indices.breaker.CircuitBreakerService;
4342
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
4443
import org.elasticsearch.node.Node;
4544
import org.elasticsearch.plugins.Plugin;
@@ -263,10 +262,17 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
263262
}
264263
}
265264

265+
@Override
266+
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
267+
throw new ConnectTransportException(node, "DISCONNECT: simulated");
268+
}
269+
266270
@Override
267271
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
268272
TransportRequestOptions options) throws IOException {
269-
simulateDisconnect(connection, original, "DISCONNECT: simulated");
273+
connection.close();
274+
// send the request, which will blow up
275+
connection.sendRequest(requestId, action, request, options);
270276
}
271277
});
272278
}
@@ -301,19 +307,12 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final
301307

302308
addDelegate(transportAddress, new DelegateTransport(original) {
303309

304-
@Override
305-
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
306-
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
307-
throws ConnectTransportException {
308-
original.connectToNode(node, connectionProfile, connectionValidator);
309-
}
310-
311310
@Override
312311
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
313312
TransportRequestOptions options) throws IOException {
314313
if (blockedActions.contains(action)) {
315314
logger.info("--> preventing {} request", action);
316-
simulateDisconnect(connection, original, "DISCONNECT: prevented " + action + " request");
315+
connection.close();
317316
}
318317
connection.sendRequest(requestId, action, request, options);
319318
}
@@ -347,6 +346,11 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
347346
}
348347
}
349348

349+
@Override
350+
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
351+
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
352+
}
353+
350354
@Override
351355
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
352356
TransportRequestOptions options) throws IOException {
@@ -413,6 +417,28 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
413417
}
414418
}
415419

420+
@Override
421+
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
422+
TimeValue delay = getDelay();
423+
if (delay.millis() <= 0) {
424+
return original.openConnection(node, profile);
425+
}
426+
427+
// TODO: Replace with proper setting
428+
TimeValue connectingTimeout = NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
429+
try {
430+
if (delay.millis() < connectingTimeout.millis()) {
431+
Thread.sleep(delay.millis());
432+
return original.openConnection(node, profile);
433+
} else {
434+
Thread.sleep(connectingTimeout.millis());
435+
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
436+
}
437+
} catch (InterruptedException e) {
438+
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
439+
}
440+
}
441+
416442
@Override
417443
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
418444
TransportRequestOptions options) throws IOException {
@@ -494,37 +520,6 @@ private LookupTestTransport transport() {
494520
return (LookupTestTransport) transport;
495521
}
496522

497-
/**
498-
* simulates a disconnect by disconnecting from the underlying transport and throwing a
499-
* {@link ConnectTransportException}
500-
*/
501-
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason) {
502-
simulateDisconnect(node, transport, reason, null);
503-
}
504-
505-
/**
506-
* simulates a disconnect by disconnecting from the underlying transport and throwing a
507-
* {@link ConnectTransportException}, due to a specific cause exception
508-
*/
509-
private void simulateDisconnect(DiscoveryNode node, Transport transport, String reason, @Nullable Throwable e) {
510-
if (transport.nodeConnected(node)) {
511-
// this a connected node, disconnecting from it will be up the exception
512-
transport.disconnectFromNode(node);
513-
} else {
514-
throw new ConnectTransportException(node, reason, e);
515-
}
516-
}
517-
518-
/**
519-
* simulates a disconnect by closing the connection and throwing a
520-
* {@link ConnectTransportException}
521-
*/
522-
private void simulateDisconnect(Transport.Connection connection, Transport transport, String reason) throws IOException {
523-
connection.close();
524-
simulateDisconnect(connection.getNode(), transport, reason);
525-
}
526-
527-
528523
/**
529524
* A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
530525
* if none is registered, then the default one is used.

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,12 +1472,7 @@ public void handleException(TransportException exp) {
14721472
// all is well
14731473
}
14741474

1475-
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
1476-
serviceB.handshake(connection, 100);
1477-
fail("exception should be thrown");
1478-
} catch (IllegalStateException e) {
1479-
// all is well
1480-
}
1475+
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
14811476
}
14821477

14831478
public void testMockUnresponsiveRule() throws IOException {
@@ -1528,12 +1523,7 @@ public void handleException(TransportException exp) {
15281523
// all is well
15291524
}
15301525

1531-
try (Transport.Connection connection = serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE)){
1532-
serviceB.handshake(connection, 100);
1533-
fail("exception should be thrown");
1534-
} catch (IllegalStateException e) {
1535-
// all is well
1536-
}
1526+
expectThrows(ConnectTransportException.class, () -> serviceB.openConnection(nodeA, MockTcpTransport.LIGHT_PROFILE));
15371527
}
15381528

15391529

0 commit comments

Comments
 (0)