diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index d6132b26b0899..bee98362e0c1e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -20,8 +20,10 @@ package org.elasticsearch.transport.netty4; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; +import java.io.IOException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; @@ -71,8 +73,14 @@ public void addCloseListener(ActionListener listener) { } @Override - public void setSoLinger(int value) { - channel.config().setOption(ChannelOption.SO_LINGER, value); + public void setSoLinger(int value) throws IOException { + if (channel.isOpen()) { + try { + channel.config().setOption(ChannelOption.SO_LINGER, value); + } catch (ChannelException e) { + throw new IOException(e); + } + } } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 564cf61a39569..d8b3389f3bec3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty4; +import java.net.SocketException; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -101,8 +102,13 @@ public void testThatNothingIsReturnedForOtherInvalidPackets() throws Exception { socket.getOutputStream().write("FOOBAR".getBytes(StandardCharsets.UTF_8)); socket.getOutputStream().flush(); - // end of stream - assertThat(socket.getInputStream().read(), is(-1)); + try { + // end of stream + assertThat(socket.getInputStream().read(), is(-1)); + } catch (SocketException e) { + // On some systems the RST from the other end leads to an exception instead of reading EOF. + assertThat(e.getMessage(), is("Connection reset")); + } } } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index bc5cc2c92f2cb..f25ae7b67a8ae 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -118,4 +118,18 @@ static void awaitConnected(DiscoveryNode discoveryNode, List> } } + /** + * Aborts a channel's connection with RST, then closes the channel. + * + * @param channel Channel to disconnect with RST and close + */ + static void rstAndClose(TcpChannel channel) { + try { + channel.setSoLinger(0); + } catch (IOException e) { + // We ignore the exception as we are closing with RST because of an error. + } + CloseableChannel.closeChannel(channel); + } + } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 27b4aa7293e18..73c1b6f343c04 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -765,7 +765,7 @@ protected final void doStop() { public void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources - CloseableChannel.closeChannel(channel); + TcpChannel.rstAndClose(channel); return; } @@ -815,7 +815,7 @@ protected void innerOnFailure(Exception e) { } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant - CloseableChannel.closeChannel(channel); + TcpChannel.rstAndClose(channel); } }