Skip to content

Commit a7c3521

Browse files
original-brownbears1monw
authored andcommitted
#26701 Close TcpTransport on RST in some Spots to Prevent Leaking TIME_WAIT Sockets (#26764)
#26701 Added option to RST instead of FIN to TcpTransport#closeChannels
1 parent b7c4785 commit a7c3521

File tree

5 files changed

+31
-13
lines changed

5 files changed

+31
-13
lines changed

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ public Channel channel(TransportRequestOptions.Type type) {
442442
public void close() throws IOException {
443443
if (closed.compareAndSet(false, true)) {
444444
try {
445-
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false);
445+
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true);
446446
} finally {
447447
transportService.onConnectionClosed(this);
448448
}
@@ -640,7 +640,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
640640
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
641641
if (isOpen(channel)) {
642642
try {
643-
closeChannels(Collections.singletonList(channel), false);
643+
closeChannels(Collections.singletonList(channel), false, false);
644644
} catch (IOException e) {
645645
logger.warn("failed to close channel", e);
646646
}
@@ -902,7 +902,7 @@ protected final void doStop() {
902902
// first stop to accept any incoming connections so nobody can connect to this transport
903903
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
904904
try {
905-
closeChannels(entry.getValue(), true);
905+
closeChannels(entry.getValue(), true, true);
906906
} catch (Exception e) {
907907
logger.debug(
908908
(Supplier<?>) () -> new ParameterizedMessage(
@@ -975,7 +975,7 @@ protected void onException(Channel channel, Exception e) {
975975
@Override
976976
protected void innerInnerOnResponse(Channel channel) {
977977
try {
978-
closeChannels(Collections.singletonList(channel), false);
978+
closeChannels(Collections.singletonList(channel), false, false);
979979
} catch (IOException e1) {
980980
logger.debug("failed to close httpOnTransport channel", e1);
981981
}
@@ -984,7 +984,7 @@ protected void innerInnerOnResponse(Channel channel) {
984984
@Override
985985
protected void innerOnFailure(Exception e) {
986986
try {
987-
closeChannels(Collections.singletonList(channel), false);
987+
closeChannels(Collections.singletonList(channel), false, false);
988988
} catch (IOException e1) {
989989
e.addSuppressed(e1);
990990
logger.debug("failed to close httpOnTransport channel", e1);
@@ -1021,8 +1021,9 @@ protected void innerOnFailure(Exception e) {
10211021
*
10221022
* @param channels the channels to close
10231023
* @param blocking whether the channels should be closed synchronously
1024+
* @param closingTransport whether we abort the connection on RST instead of FIN
10241025
*/
1025-
protected abstract void closeChannels(List<Channel> channels, boolean blocking) throws IOException;
1026+
protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException;
10261027

10271028
/**
10281029
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception

core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException
191191
}
192192

193193
@Override
194-
protected void closeChannels(List channel, boolean blocking) throws IOException {
194+
protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException {
195195

196196
}
197197

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,12 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList
331331
}
332332

333333
@Override
334-
protected void closeChannels(final List<Channel> channels, boolean blocking) throws IOException {
334+
protected void closeChannels(final List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException {
335+
if (closingTransport) {
336+
for (Channel channel : channels) {
337+
channel.config().setOption(ChannelOption.SO_LINGER, 0);
338+
}
339+
}
335340
if (blocking) {
336341
Netty4Utils.closeChannels(channels);
337342
} else {

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ protected InetSocketAddress getLocalAddress(MockChannel mockChannel) {
117117
@Override
118118
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
119119
MockServerSocket socket = new MockServerSocket();
120-
socket.bind(address);
121120
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
122121
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
123122
if (tcpReceiveBufferSize.getBytes() > 0) {
124123
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
125124
}
125+
socket.bind(address);
126126
MockChannel serverMockChannel = new MockChannel(socket, name);
127127
CountDownLatch started = new CountDownLatch(1);
128128
executor.execute(new AbstractRunnable() {
@@ -242,8 +242,15 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac
242242
}
243243

244244
@Override
245-
protected void closeChannels(List<MockChannel> channel, boolean blocking) throws IOException {
246-
IOUtils.close(channel);
245+
protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
246+
if (closingTransport) {
247+
for (MockChannel channel : channels) {
248+
if (channel.activeChannel != null) {
249+
channel.activeChannel.setSoLinger(true, 0);
250+
}
251+
}
252+
}
253+
IOUtils.close(channels);
247254
}
248255

249256
@Override

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport.nio;
2121

22+
import java.net.StandardSocketOptions;
2223
import org.elasticsearch.ElasticsearchException;
2324
import org.elasticsearch.ExceptionsHelper;
2425
import org.elasticsearch.action.ActionListener;
@@ -28,7 +29,6 @@
2829
import org.elasticsearch.common.network.NetworkService;
2930
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
31-
import org.elasticsearch.common.unit.ByteSizeValue;
3232
import org.elasticsearch.common.util.BigArrays;
3333
import org.elasticsearch.common.util.concurrent.EsExecutors;
3434
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@@ -99,7 +99,12 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th
9999
}
100100

101101
@Override
102-
protected void closeChannels(List<NioChannel> channels, boolean blocking) throws IOException {
102+
protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
103+
if (closingTransport) {
104+
for (NioChannel channel : channels) {
105+
channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0);
106+
}
107+
}
103108
ArrayList<CloseFuture> futures = new ArrayList<>(channels.size());
104109
for (final NioChannel channel : channels) {
105110
if (channel != null && channel.isOpen()) {

0 commit comments

Comments
 (0)