Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.elasticsearch.transport.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -71,8 +73,14 @@ public void addCloseListener(ActionListener<Void> listener) {
}

@Override
public void setSoLinger(int value) {
channel.config().setOption(ChannelOption.SO_LINGER, value);
public void setSoLinger(int value) throws IOException {
if (channel.isOpen()) {
try {
channel.config().setOption(ChannelOption.SO_LINGER, value);
} catch (ChannelException e) {
throw new IOException(e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.netty4;

import java.net.SocketException;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -101,8 +102,13 @@ public void testThatNothingIsReturnedForOtherInvalidPackets() throws Exception {
socket.getOutputStream().write("FOOBAR".getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();

// end of stream
assertThat(socket.getInputStream().read(), is(-1));
try {
// end of stream
assertThat(socket.getInputStream().read(), is(-1));
} catch (SocketException e) {
// On some systems the RST from the other end leads to an exception instead of reading EOF.
assertThat(e.getMessage(), is("Connection reset"));
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/transport/TcpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,18 @@ static void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<Void>>
}
}

/**
* Aborts a channel's connection with RST, then closes the channel.
*
* @param channel Channel to disconnect with RST and close
*/
static void rstAndClose(TcpChannel channel) {
try {
channel.setSoLinger(0);
} catch (IOException e) {
// We ignore the exception as we are closing with RST because of an error.
}
CloseableChannel.closeChannel(channel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ protected final void doStop() {
public void onException(TcpChannel channel, Exception e) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
TcpChannel.rstAndClose(channel);
return;
}

Expand Down Expand Up @@ -815,7 +815,7 @@ protected void innerOnFailure(Exception e) {
} else {
logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e);
// close the channel, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
TcpChannel.rstAndClose(channel);
}
}

Expand Down