Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ protected void connectToNodesAndWait(ClusterState newClusterState) {
}
});
recoverySettings = new RecoverySettings(settings, clusterSettings);
mockTransport = new DisruptableMockTransport(node, logger) {
mockTransport = new DisruptableMockTransport(node, logger, deterministicTaskQueue) {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
if (node.equals(destination)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ class ClusterNode {

private void setUp() {
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
mockTransport = new DisruptableMockTransport(localNode, logger) {
mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) {
@Override
protected void execute(Runnable runnable) {
deterministicTaskQueue.scheduleNow(onNode(runnable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -44,17 +45,20 @@
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.test.ESTestCase.copyWriteable;

public abstract class DisruptableMockTransport extends MockTransport {
private final DiscoveryNode localNode;
private final Logger logger;
private final DeterministicTaskQueue deterministicTaskQueue;

public DisruptableMockTransport(DiscoveryNode localNode, Logger logger) {
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger, DeterministicTaskQueue deterministicTaskQueue) {
this.localNode = localNode;
this.logger = logger;
this.deterministicTaskQueue = deterministicTaskQueue;
}

protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode destination);
Expand Down Expand Up @@ -159,6 +163,9 @@ protected String getRequestDescription(long requestId, String action, DiscoveryN

protected void onBlackholedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
logger.trace("dropping {}", getRequestDescription(requestId, action, destinationTransport.getLocalNode()));
// Delaying the request for one day and then disconnect to simulate a very long pause
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + TimeUnit.DAYS.toMillis(1L),
() -> onDisconnectedDuringSend(requestId, action, destinationTransport));
}

protected void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
Expand Down Expand Up @@ -199,7 +206,8 @@ public void run() {

case BLACK_HOLE:
case DISCONNECTED:
logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
logger.trace("delaying response to {}: channel is {}", requestDescription, connectionStatus);
onBlackholedDuringSend(requestId, action, destinationTransport);
break;

default:
Expand Down Expand Up @@ -229,7 +237,9 @@ public void run() {

case BLACK_HOLE:
case DISCONNECTED:
logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
logger.trace("delaying exception response to {}: channel is {}",
requestDescription, connectionStatus);
onBlackholedDuringSend(requestId, action, destinationTransport);
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.instanceOf;

public class DisruptableMockTransportTests extends ESTestCase {

Expand Down Expand Up @@ -103,7 +104,7 @@ public void initTransports() {
deterministicTaskQueue = new DeterministicTaskQueue(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());

final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger, deterministicTaskQueue) {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
Expand All @@ -120,7 +121,7 @@ protected void execute(Runnable runnable) {
}
};

final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger, deterministicTaskQueue) {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
Expand Down Expand Up @@ -318,27 +319,33 @@ public void testDisconnectedOnSuccessfulResponse() throws IOException {
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));

send(service1, node2, responseHandlerShouldNotBeCalled());
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
deterministicTaskQueue.runAllRunnableTasks();
assertNotNull(responseHandlerChannel.get());
assertNull(responseHandlerException.get());

disconnectedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.runAllTasks();
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
}

public void testDisconnectedOnExceptionalResponse() throws IOException {
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));

send(service1, node2, responseHandlerShouldNotBeCalled());
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
deterministicTaskQueue.runAllRunnableTasks();
assertNotNull(responseHandlerChannel.get());
assertNull(responseHandlerException.get());

disconnectedLinks.add(Tuple.tuple(node2, node1));
responseHandlerChannel.get().sendResponse(new Exception());
deterministicTaskQueue.runAllRunnableTasks();
deterministicTaskQueue.runAllTasks();
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
}

public void testUnavailableOnSuccessfulResponse() throws IOException {
Expand Down