Skip to content

Commit 9dc0ca0

Browse files
Fix Blackholed Connection Behavior in DisruptableMockTransport (#61310)
It is not realistic to drop messages without eventually failing. To retain the coverage of long pauses this PR adjusts the blackholed behavior to fail a send after 24h (which is assumed to be longer than any timeout in the system) instead of never. Closes #61034
1 parent 717db9c commit 9dc0ca0

File tree

4 files changed

+28
-11
lines changed

4 files changed

+28
-11
lines changed

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,7 @@ protected void connectToNodesAndWait(ClusterState newClusterState) {
14171417
}
14181418
});
14191419
recoverySettings = new RecoverySettings(settings, clusterSettings);
1420-
mockTransport = new DisruptableMockTransport(node, logger) {
1420+
mockTransport = new DisruptableMockTransport(node, logger, deterministicTaskQueue) {
14211421
@Override
14221422
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
14231423
if (node.equals(destination)) {

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ class ClusterNode {
929929

930930
private void setUp() {
931931
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
932-
mockTransport = new DisruptableMockTransport(localNode, logger) {
932+
mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) {
933933
@Override
934934
protected void execute(Runnable runnable) {
935935
deterministicTaskQueue.scheduleNow(onNode(runnable));

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.settings.ClusterSettings;
@@ -44,17 +45,20 @@
4445
import java.io.IOException;
4546
import java.util.Optional;
4647
import java.util.Set;
48+
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Function;
4850

4951
import static org.elasticsearch.test.ESTestCase.copyWriteable;
5052

5153
public abstract class DisruptableMockTransport extends MockTransport {
5254
private final DiscoveryNode localNode;
5355
private final Logger logger;
56+
private final DeterministicTaskQueue deterministicTaskQueue;
5457

55-
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger) {
58+
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger, DeterministicTaskQueue deterministicTaskQueue) {
5659
this.localNode = localNode;
5760
this.logger = logger;
61+
this.deterministicTaskQueue = deterministicTaskQueue;
5862
}
5963

6064
protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode destination);
@@ -159,6 +163,9 @@ protected String getRequestDescription(long requestId, String action, DiscoveryN
159163

160164
protected void onBlackholedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
161165
logger.trace("dropping {}", getRequestDescription(requestId, action, destinationTransport.getLocalNode()));
166+
// Delaying the request for one day and then disconnect to simulate a very long pause
167+
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + TimeUnit.DAYS.toMillis(1L),
168+
() -> onDisconnectedDuringSend(requestId, action, destinationTransport));
162169
}
163170

164171
protected void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
@@ -199,7 +206,8 @@ public void run() {
199206

200207
case BLACK_HOLE:
201208
case DISCONNECTED:
202-
logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
209+
logger.trace("delaying response to {}: channel is {}", requestDescription, connectionStatus);
210+
onBlackholedDuringSend(requestId, action, destinationTransport);
203211
break;
204212

205213
default:
@@ -229,7 +237,9 @@ public void run() {
229237

230238
case BLACK_HOLE:
231239
case DISCONNECTED:
232-
logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
240+
logger.trace("delaying exception response to {}: channel is {}",
241+
requestDescription, connectionStatus);
242+
onBlackholedDuringSend(requestId, action, destinationTransport);
233243
break;
234244

235245
default:

test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
5858
import static org.hamcrest.Matchers.containsString;
5959
import static org.hamcrest.Matchers.endsWith;
60+
import static org.hamcrest.Matchers.instanceOf;
6061

6162
public class DisruptableMockTransportTests extends ESTestCase {
6263

@@ -103,7 +104,7 @@ public void initTransports() {
103104
deterministicTaskQueue = new DeterministicTaskQueue(
104105
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
105106

106-
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
107+
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger, deterministicTaskQueue) {
107108
@Override
108109
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
109110
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -120,7 +121,7 @@ protected void execute(Runnable runnable) {
120121
}
121122
};
122123

123-
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
124+
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger, deterministicTaskQueue) {
124125
@Override
125126
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
126127
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -318,27 +319,33 @@ public void testDisconnectedOnSuccessfulResponse() throws IOException {
318319
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
319320
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
320321

321-
send(service1, node2, responseHandlerShouldNotBeCalled());
322+
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
323+
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
322324
deterministicTaskQueue.runAllRunnableTasks();
323325
assertNotNull(responseHandlerChannel.get());
326+
assertNull(responseHandlerException.get());
324327

325328
disconnectedLinks.add(Tuple.tuple(node2, node1));
326329
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
327-
deterministicTaskQueue.runAllRunnableTasks();
330+
deterministicTaskQueue.runAllTasks();
331+
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
328332
}
329333

330334
public void testDisconnectedOnExceptionalResponse() throws IOException {
331335
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
332336
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
333337
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
334338

335-
send(service1, node2, responseHandlerShouldNotBeCalled());
339+
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
340+
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
336341
deterministicTaskQueue.runAllRunnableTasks();
337342
assertNotNull(responseHandlerChannel.get());
343+
assertNull(responseHandlerException.get());
338344

339345
disconnectedLinks.add(Tuple.tuple(node2, node1));
340346
responseHandlerChannel.get().sendResponse(new Exception());
341-
deterministicTaskQueue.runAllRunnableTasks();
347+
deterministicTaskQueue.runAllTasks();
348+
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
342349
}
343350

344351
public void testUnavailableOnSuccessfulResponse() throws IOException {

0 commit comments

Comments
 (0)