Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public Channel channel(TransportRequestOptions.Type type) {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false);
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()), false, true);
} finally {
transportService.onConnectionClosed(this);
}
Expand Down Expand Up @@ -640,7 +640,7 @@ private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels n
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
if (isOpen(channel)) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e) {
logger.warn("failed to close channel", e);
}
Expand Down Expand Up @@ -902,7 +902,7 @@ protected final void doStop() {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
try {
closeChannels(entry.getValue(), true);
closeChannels(entry.getValue(), true, true);
} catch (Exception e) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
Expand Down Expand Up @@ -975,7 +975,7 @@ protected void onException(Channel channel, Exception e) {
@Override
protected void innerInnerOnResponse(Channel channel) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) {
logger.debug("failed to close httpOnTransport channel", e1);
}
Expand All @@ -984,7 +984,7 @@ protected void innerInnerOnResponse(Channel channel) {
@Override
protected void innerOnFailure(Exception e) {
try {
closeChannels(Collections.singletonList(channel), false);
closeChannels(Collections.singletonList(channel), false, false);
} catch (IOException e1) {
e.addSuppressed(e1);
logger.debug("failed to close httpOnTransport channel", e1);
Expand Down Expand Up @@ -1021,8 +1021,9 @@ protected void innerOnFailure(Exception e) {
*
* @param channels the channels to close
* @param blocking whether the channels should be closed synchronously
* @param closingTransport whether we abort the connection on RST instead of FIN
*/
protected abstract void closeChannels(List<Channel> channels, boolean blocking) throws IOException;
protected abstract void closeChannels(List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException;

/**
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException
}

@Override
protected void closeChannels(List channel, boolean blocking) throws IOException {
protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList
}

@Override
protected void closeChannels(final List<Channel> channels, boolean blocking) throws IOException {
protected void closeChannels(final List<Channel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (Channel channel : channels) {
channel.config().setOption(ChannelOption.SO_LINGER, 0);
}
}
if (blocking) {
Netty4Utils.closeChannels(channels);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ protected InetSocketAddress getLocalAddress(MockChannel mockChannel) {
@Override
protected MockChannel bind(final String name, InetSocketAddress address) throws IOException {
MockServerSocket socket = new MockServerSocket();
socket.bind(address);
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {
socket.setReceiveBufferSize(tcpReceiveBufferSize.bytesAsInt());
}
socket.bind(address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense ++

MockChannel serverMockChannel = new MockChannel(socket, name);
CountDownLatch started = new CountDownLatch(1);
executor.execute(new AbstractRunnable() {
Expand Down Expand Up @@ -242,8 +242,15 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac
}

@Override
protected void closeChannels(List<MockChannel> channel, boolean blocking) throws IOException {
IOUtils.close(channel);
protected void closeChannels(List<MockChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (MockChannel channel : channels) {
if (channel.activeChannel != null) {
channel.activeChannel.setSoLinger(true, 0);
}
}
}
IOUtils.close(channels);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio;

import java.net.StandardSocketOptions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand All @@ -28,7 +29,6 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -99,7 +99,12 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th
}

@Override
protected void closeChannels(List<NioChannel> channels, boolean blocking) throws IOException {
protected void closeChannels(List<NioChannel> channels, boolean blocking, boolean closingTransport) throws IOException {
if (closingTransport) {
for (NioChannel channel : channels) {
channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0);
}
}
ArrayList<CloseFuture> futures = new ArrayList<>(channels.size());
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
Expand Down