Skip to content

Commit bebfa23

Browse files
committed
Check for closed connection while opening
While opening a connection to a node, a channel can subsequently close. If this happens, a future callback whose purpose is to close all other channels and disconnect from the node will fire. However, this future will not be ready to close all the channels because the connection will not be exposed to the future callback yet. Since this callback is run once, we will never try to disconnect from this node again and we will be left with a closed channel. This commit adds a check that all channels are open before exposing the channel and throws a general connection exception. In this case, the usual connection retry logic will take over. Relates #26932
1 parent 9e36c16 commit bebfa23

File tree

9 files changed

+119
-19
lines changed

9 files changed

+119
-19
lines changed

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,9 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
582582
}
583583
transportServiceAdapter.onConnectionOpened(nodeChannels);
584584
connectionRef.set(nodeChannels);
585+
if (Arrays.stream(nodeChannels.channels).allMatch(this::isOpen) == false) {
586+
throw new ConnectTransportException(node, "a channel closed while connecting");
587+
}
585588
success = true;
586589
return nodeChannels;
587590
} catch (ConnectTransportException e) {
@@ -1061,7 +1064,18 @@ public void innerOnFailure(Exception e) {
10611064
*/
10621065
protected abstract void sendMessage(Channel channel, BytesReference reference, ActionListener<Channel> listener);
10631066

1064-
protected abstract NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile connectionProfile,
1067+
/**
1068+
* Connect to the node with channels as defined by the specified connection profile. Implementations must invoke the specified channel
1069+
* close callback when a channel is closed.
1070+
*
1071+
* @param node the node to connect to
1072+
* @param connectionProfile the connection profile
1073+
* @param onChannelClose callback to invoke when a channel is closed
1074+
* @return the channels
1075+
* @throws IOException if an I/O exception occurs while opening channels
1076+
*/
1077+
protected abstract NodeChannels connectToChannels(DiscoveryNode node,
1078+
ConnectionProfile connectionProfile,
10651079
Consumer<Channel> onChannelClose) throws IOException;
10661080

10671081
/**

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Objects;
6363
import java.util.concurrent.CopyOnWriteArrayList;
6464
import java.util.concurrent.CountDownLatch;
65+
import java.util.concurrent.ExecutorService;
6566
import java.util.concurrent.ScheduledFuture;
6667
import java.util.function.Function;
6768
import java.util.function.Predicate;
@@ -193,6 +194,15 @@ protected TaskManager createTaskManager() {
193194
return new TaskManager(settings);
194195
}
195196

197+
/**
198+
* The executor service for this transport service.
199+
*
200+
* @return the executor service
201+
*/
202+
protected ExecutorService getExecutorService() {
203+
return threadPool.generic();
204+
}
205+
196206
void setTracerLogInclude(List<String> tracerLogInclude) {
197207
this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY);
198208
}
@@ -240,7 +250,7 @@ protected void doStop() {
240250
if (holderToNotify != null) {
241251
// callback that an exception happened, but on a different thread since we don't
242252
// want handlers to worry about stack overflows
243-
threadPool.generic().execute(new AbstractRunnable() {
253+
getExecutorService().execute(new AbstractRunnable() {
244254
@Override
245255
public void onRejection(Exception e) {
246256
// if we get rejected during node shutdown we don't wanna bubble it up
@@ -855,7 +865,7 @@ public void onNodeConnected(final DiscoveryNode node) {
855865
// connectToNode(); connection is completed successfully
856866
// addConnectionListener(); this listener shouldn't be called
857867
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
858-
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
868+
getExecutorService().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
859869
}
860870

861871
@Override
@@ -864,13 +874,13 @@ public void onConnectionOpened(Transport.Connection connection) {
864874
// connectToNode(); connection is completed successfully
865875
// addConnectionListener(); this listener shouldn't be called
866876
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
867-
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
877+
getExecutorService().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
868878
}
869879

870880
@Override
871881
public void onNodeDisconnected(final DiscoveryNode node) {
872882
try {
873-
threadPool.generic().execute(() -> {
883+
getExecutorService().execute( () -> {
874884
for (final TransportConnectionListener connectionListener : connectionListeners) {
875885
connectionListener.onNodeDisconnected(node);
876886
}
@@ -890,7 +900,7 @@ public void onConnectionClosed(Transport.Connection connection) {
890900
if (holderToNotify != null) {
891901
// callback that an exception happened, but on a different thread since we don't
892902
// want handlers to worry about stack overflows
893-
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
903+
getExecutorService().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
894904
connection.getNode(), holderToNotify.action())));
895905
}
896906
}

core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ protected void sendMessage(Object o, BytesReference reference, ActionListener li
204204
}
205205

206206
@Override
207-
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile,
208-
Consumer onChannelClose) throws IOException {
207+
protected NodeChannels connectToChannels(
208+
DiscoveryNode node, ConnectionProfile profile, Consumer onChannelClose) throws IOException {
209209
return new NodeChannels(node, new Object[profile.getNumConnections()], profile);
210210
}
211211

modules/transport-netty3/src/test/java/org/elasticsearch/transport/netty3/SimpleNetty3TransportTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3636
import org.elasticsearch.transport.BindTransportException;
3737
import org.elasticsearch.transport.ConnectTransportException;
38+
import org.elasticsearch.transport.TcpTransport;
3839
import org.elasticsearch.transport.Transport;
3940
import org.elasticsearch.transport.TransportService;
4041
import org.elasticsearch.transport.TransportSettings;
@@ -94,6 +95,13 @@ protected MockTransportService build(Settings settings, Version version, Cluster
9495
return transportService;
9596
}
9697

98+
@Override
99+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
100+
final Netty3Transport t = (Netty3Transport) transport;
101+
@SuppressWarnings("unchecked") final TcpTransport<Channel>.NodeChannels channels = (TcpTransport<Channel>.NodeChannels) connection;
102+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
103+
}
104+
97105
public void testConnectException() throws UnknownHostException {
98106
try {
99107
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9876),

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3737
import org.elasticsearch.transport.BindTransportException;
3838
import org.elasticsearch.transport.ConnectTransportException;
39+
import org.elasticsearch.transport.TcpTransport;
3940
import org.elasticsearch.transport.Transport;
4041
import org.elasticsearch.transport.TransportService;
4142
import org.elasticsearch.transport.TransportSettings;
@@ -52,7 +53,7 @@
5253
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
5354

5455
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
55-
ClusterSettings clusterSettings, boolean doHandshake) {
56+
ClusterSettings clusterSettings, boolean doHandshake) {
5657
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
5758
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
5859
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@@ -86,6 +87,13 @@ protected MockTransportService build(Settings settings, Version version, Cluster
8687
return transportService;
8788
}
8889

90+
@Override
91+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
92+
final Netty4Transport t = (Netty4Transport) transport;
93+
@SuppressWarnings("unchecked") final TcpTransport<Channel>.NodeChannels channels = (TcpTransport<Channel>.NodeChannels) connection;
94+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
95+
}
96+
8997
public void testConnectException() throws UnknownHostException {
9098
try {
9199
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9876),
@@ -108,7 +116,8 @@ public void testBindUnavailableAddress() {
108116
.build();
109117
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
110118
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
111-
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
119+
MockTransportService transportService =
120+
nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
112121
try {
113122
transportService.start();
114123
} finally {

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Set;
7373
import java.util.concurrent.ConcurrentMap;
7474
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.ExecutorService;
7576
import java.util.concurrent.LinkedBlockingDeque;
7677
import java.util.concurrent.atomic.AtomicBoolean;
7778
import java.util.function.Function;
@@ -179,6 +180,17 @@ protected TaskManager createTaskManager() {
179180
}
180181
}
181182

183+
private volatile String executorName;
184+
185+
public void setExecutorName(final String executorName) {
186+
this.executorName = executorName;
187+
}
188+
189+
@Override
190+
protected ExecutorService getExecutorService() {
191+
return executorName == null ? super.getExecutorService() : getThreadPool().executor(executorName);
192+
}
193+
182194
/**
183195
* Clears all the registered rules.
184196
*/

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import org.elasticsearch.common.io.stream.StreamInput;
3535
import org.elasticsearch.common.io.stream.StreamOutput;
3636
import org.elasticsearch.common.network.NetworkService;
37+
import org.elasticsearch.common.network.NetworkUtils;
3738
import org.elasticsearch.common.settings.ClusterSettings;
3839
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.common.transport.BoundTransportAddress;
3941
import org.elasticsearch.common.transport.InetSocketTransportAddress;
4042
import org.elasticsearch.common.transport.TransportAddress;
4143
import org.elasticsearch.common.unit.TimeValue;
@@ -55,6 +57,8 @@
5557

5658
import java.io.IOException;
5759
import java.io.UncheckedIOException;
60+
import java.net.Inet4Address;
61+
import java.net.Inet6Address;
5862
import java.net.InetAddress;
5963
import java.net.InetSocketAddress;
6064
import java.net.ServerSocket;
@@ -76,11 +80,14 @@
7680
import java.util.concurrent.atomic.AtomicBoolean;
7781
import java.util.concurrent.atomic.AtomicInteger;
7882
import java.util.concurrent.atomic.AtomicReference;
83+
import java.util.stream.Collectors;
7984

8085
import static java.util.Collections.emptyMap;
8186
import static java.util.Collections.emptySet;
87+
import static org.hamcrest.Matchers.containsString;
8288
import static org.hamcrest.Matchers.empty;
8389
import static org.hamcrest.Matchers.equalTo;
90+
import static org.hamcrest.Matchers.hasToString;
8491
import static org.hamcrest.Matchers.instanceOf;
8592
import static org.hamcrest.Matchers.notNullValue;
8693
import static org.hamcrest.Matchers.startsWith;
@@ -142,14 +149,14 @@ public void onNodeDisconnected(DiscoveryNode node) {
142149
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
143150
Settings settings, boolean acceptRequests, boolean doHandshake) {
144151
MockTransportService service = build(
145-
Settings.builder()
146-
.put(settings)
147-
.put(Node.NODE_NAME_SETTING.getKey(), name)
148-
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
149-
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
150-
.build(),
151-
version,
152-
clusterSettings, doHandshake);
152+
Settings.builder()
153+
.put(settings)
154+
.put(Node.NODE_NAME_SETTING.getKey(), name)
155+
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
156+
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
157+
.build(),
158+
version,
159+
clusterSettings, doHandshake);
153160
if (acceptRequests) {
154161
service.acceptIncomingRequests();
155162
}
@@ -2239,4 +2246,33 @@ public String executor() {
22392246
responseLatch.await();
22402247
}
22412248
}
2249+
2250+
public void testChannelCloseWhileConnecting() throws IOException {
2251+
try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) {
2252+
service.setExecutorName(ThreadPool.Names.SAME); // make sure stuff is executed in a blocking fashion
2253+
service.addConnectionListener(new TransportConnectionListener() {
2254+
@Override
2255+
public void onConnectionOpened(final Transport.Connection connection) {
2256+
try {
2257+
closeConnectionChannel(service.getOriginalTransport(), connection);
2258+
} catch (final IOException e) {
2259+
throw new AssertionError(e);
2260+
}
2261+
}
2262+
});
2263+
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
2264+
builder.addConnections(1,
2265+
TransportRequestOptions.Type.BULK,
2266+
TransportRequestOptions.Type.PING,
2267+
TransportRequestOptions.Type.RECOVERY,
2268+
TransportRequestOptions.Type.REG,
2269+
TransportRequestOptions.Type.STATE);
2270+
final ConnectTransportException e =
2271+
expectThrows(ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build()));
2272+
assertThat(e, hasToString(containsString(("a channel closed while connecting"))));
2273+
}
2274+
}
2275+
2276+
protected abstract void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException;
2277+
22422278
}

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
176176
}
177177

178178
@Override
179-
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile,
179+
protected NodeChannels connectToChannels(DiscoveryNode node,
180+
ConnectionProfile profile,
180181
Consumer<MockChannel> onChannelClose) throws IOException {
181182
final MockChannel[] mockChannels = new MockChannel[1];
182183
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here

test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434

3535
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
36+
3637
@Override
3738
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
3839
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
@@ -53,4 +54,13 @@ protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel,
5354
mockTransportService.start();
5455
return mockTransportService;
5556
}
57+
58+
@Override
59+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
60+
final MockTcpTransport t = (MockTcpTransport) transport;
61+
@SuppressWarnings("unchecked") final TcpTransport<MockTcpTransport.MockChannel>.NodeChannels channels =
62+
(TcpTransport<MockTcpTransport.MockChannel>.NodeChannels) connection;
63+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
64+
}
65+
5666
}

0 commit comments

Comments
 (0)