diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0f9b25cc9c617..dcf9b3595c80e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -46,7 +46,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -519,37 +518,57 @@ public void removeConnectionListener(TransportConnectionListener listener) { public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); } public final void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, options, handler); } + /** + * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked. + * + * @param connection the connection to send the request on + * @param action the name of the action + * @param request the request + * @param options the options for this request + * @param handler the response handler + * @param the type of the transport response + */ public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { try { asyncSender.sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + } catch (final Exception ex) { // the caller might not handle this so we invoke the handler - handler.handleException(ex); + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); } } @@ -569,13 +588,15 @@ public final void sendChildRequest(final Discovery final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendChildRequest(connection, action, request, parentTask, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendChildRequest(connection, action, request, parentTask, options, handler); } public void sendChildRequest(final Transport.Connection connection, final String action, @@ -589,16 +610,7 @@ public void sendChildRequest(final Transport.Conne final TransportRequestOptions options, final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); - try { - sendRequest(connection, action, request, options, handler); - } catch (TaskCancelledException ex) { - // The parent task is already cancelled - just fail the request - handler.handleException(new TransportException(ex)); - } catch (NodeNotConnectedException ex) { - // the caller might not handle this so we invoke the handler - handler.handleException(ex); - } - + sendRequest(connection, action, request, options, handler); } private void sendRequestInternal(final Transport.Connection connection, final String action, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f90727c936e00..0867bb4abec1d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR); + } + + public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, + @Nullable ClusterSettings clusterSettings, Set taskHeaders, + TransportInterceptor interceptor) { + return new MockTransportService(settings, transport, threadPool, interceptor, boundAddress -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index c0fd76d7f753b..0a89b2811b1a6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -94,12 +94,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @@ -187,7 +190,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti } private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings, - Settings settings, boolean acceptRequests, boolean doHandshake) { + Settings settings, boolean acceptRequests, boolean doHandshake, + TransportInterceptor interceptor) { Settings updatedSettings = Settings.builder() .put(TransportSettings.PORT.getKey(), getPortRange()) .put(settings) @@ -198,7 +202,7 @@ private MockTransportService buildService(final String name, final Version versi } Transport transport = build(updatedSettings, version, clusterSettings, doHandshake); MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool, - clusterSettings, Collections.emptySet()); + clusterSettings, Collections.emptySet(), interceptor); service.start(); if (acceptRequests) { service.acceptIncomingRequests(); @@ -206,6 +210,11 @@ private MockTransportService buildService(final String name, final Version versi return service; } + private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings, + Settings settings, boolean acceptRequests, boolean doHandshake) { + return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR); + } + protected MockTransportService buildService(final String name, final Version version, Settings settings) { return buildService(name, version, null, settings); } @@ -2744,6 +2753,95 @@ public void onConnectionClosed(Transport.Connection connection) { } } + // test that the response handler is invoked on a failure to send + public void testFailToSend() throws InterruptedException { + final RuntimeException failToSendException; + if (randomBoolean()) { + failToSendException = new IllegalStateException("fail to send"); + } else { + failToSendException = new TransportException("fail to send"); + } + final TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public AsyncSender interceptSender(final AsyncSender sender) { + return new AsyncSender() { + @Override + public void sendRequest( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler) { + if ("fail-to-send-action".equals(action)) { + throw failToSendException; + } else { + sender.sendRequest(connection, action, request, options, handler); + } + } + }; + } + }; + try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) { + serviceC.start(); + serviceC.acceptIncomingRequests(); + final CountDownLatch latch = new CountDownLatch(1); + serviceC.connectToNode( + serviceA.getLocalDiscoNode(), + ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY), + new ActionListener<>() { + @Override + public void onResponse(final Void v) { + latch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.getMessage()); + } + }); + latch.await(); + final AtomicReference te = new AtomicReference<>(); + final Transport.Connection connection = serviceC.getConnection(nodeA); + serviceC.sendRequest( + connection, + "fail-to-send-action", + TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public void handleResponse(final TransportResponse response) { + fail("handle response should not be invoked"); + } + + @Override + public void handleException(final TransportException exp) { + te.set(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse read(final StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } + }); + assertThat(te.get(), not(nullValue())); + + if (failToSendException instanceof IllegalStateException) { + assertThat(te.get().getMessage(), equalTo("failure to send")); + assertThat(te.get().getCause(), instanceOf(IllegalStateException.class)); + assertThat(te.get().getCause().getMessage(), equalTo("fail to send")); + } else { + assertThat(te.get().getMessage(), equalTo("fail to send")); + assertThat(te.get().getCause(), nullValue()); + } + } + + } + private void closeConnectionChannel(Transport.Connection connection) { StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection; TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();