-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Close connection manager on current thread in RemoteClusterConnection #44805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pinging @elastic/es-distributed |
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java
Show resolved
Hide resolved
| Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator(); | ||
| while (iterator.hasNext()) { | ||
| Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next(); | ||
| if (waitForPendingConnections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is actually safe. If the problem is that we're not safely handling all the pending connections here because of some race with the thread-pool then we're leaking FDs and now have no visibility on that?
Can't we just do:
diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java
index ed26d0b07cd..d87aca31a13 100644
--- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java
+++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java
@@ -41,6 +41,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -255,8 +256,17 @@ public class ConnectionManager implements Closeable {
try {
closeLatch.await();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
+ logger.warn("Interrupted while waiting for in-progress connections to finish.");
+ try {
+ if (closeLatch.await(30L, TimeUnit.SECONDS) == false) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ e.addSuppressed(ie);
+ throw new IllegalStateException(e);
+ }
}
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {something like this? If we in fact are safely shutting down those connections eventually even if the threadpool is shut down too early this should still finish properly. If not, we get proper visibility and know we have to fix the shutdown sequence to not leak connection attempts here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually looking at the linked failures in the issues, we have interrupt exceptions on connection establishment as well. I'll have to look into this some more after the meeting now, but I don't think we should just suppress the issue here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yannick see this one:
1> [2019-07-15T04:03:40,834][INFO ][o.e.t.TransportService ] [testLazyResolveTransportAddress] publish_address {127.0.0.1:14002}, bound_addresses {127.0.0.1:14002}, {[::1]:14002}
1> [2019-07-15T04:03:40,848][WARN ][o.e.t.RemoteClusterConnection] [org.elasticsearch.transport.RemoteClusterConnectionTests] fetching nodes from external cluster [test-cluster] failed
1> org.elasticsearch.common.util.CancellableThreads$ExecutionCancelledException: operation was cancelled reason [connect handler is closed]
1> at org.elasticsearch.common.util.CancellableThreads.checkForCancel(CancellableThreads.java:65) ~[main/:8.0.0-SNAPSHOT]
1> at org.elasticsearch.common.util.CancellableThreads.executeIO(CancellableThreads.java:130) ~[main/:8.0.0-SNAPSHOT]
1> at org.elasticsearch.transport.RemoteClusterConnection$ConnectHandler.collectRemoteNodes(RemoteClusterConnection.java:443) [main/:?]
1> at org.elasticsearch.transport.RemoteClusterConnection$ConnectHandler$1.doRun(RemoteClusterConnection.java:431) [main/:?]
1> at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [main/:?]
1> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
1> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
1> at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:699) [main/:?]
1> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
1> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
1> at java.lang.Thread.run(Thread.java:834) [?:?]
1> Suppressed: org.elasticsearch.transport.ConnectTransportException: [seed_node][127.0.0.1:14000] general node connection failure
1> at org.elasticsearch.transport.TcpTransport$ChannelsConnectedListener.lambda$onResponse$2(TcpTransport.java:948) ~[main/:?]
1> at org.elasticsearch.action.ActionListener$1.onFailure(ActionListener.java:70) ~[main/:?]
1> at org.elasticsearch.transport.TransportHandshaker$HandshakeResponseHandler.handleLocalException(TransportHandshaker.java:155) ~[main/:?]
1> at org.elasticsearch.transport.TransportHandshaker.lambda$sendHandshake$0(TransportHandshaker.java:67) ~[main/:?]
1> at org.elasticsearch.action.ActionListener.lambda$wrap$0(ActionListener.java:131) ~[main/:?]
1> at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:62) ~[main/:?]
1> at org.elasticsearch.action.ActionListener.lambda$toBiConsumer$3(ActionListener.java:159) ~[main/:?]
1> at org.elasticsearch.common.concurrent.CompletableContext.lambda$addListener$0(CompletableContext.java:39) ~[elasticsearch-core-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
1> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
1> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
1> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
1> at org.elasticsearch.common.concurrent.CompletableContext.complete(CompletableContext.java:61) ~[elasticsearch-core-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.ChannelContext.closeFromSelector(ChannelContext.java:77) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.SocketChannelContext.closeFromSelector(SocketChannelContext.java:170) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.EventHandler.handleClose(EventHandler.java:220) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.transport.nio.TestEventHandler.handleClose(TestEventHandler.java:199) ~[framework-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.NioSelector.closeChannel(NioSelector.java:458) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.NioSelector.queueChannelClose(NioSelector.java:311) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.BytesChannelContext.closeChannel(BytesChannelContext.java:67) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
2> jul. 15, 2019 4:03:51 A. M. com.carrotsearch.randomizedtesting.RandomizedRunner$QueueUncaughtExceptionsHandler uncaughtException
1> at org.elasticsearch.transport.nio.MockNioTransport$MockSocketChannel.close(MockNioTransport.java:304) ~[framework-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.core.internal.io.IOUtils.close(IOUtils.java:104) ~[elasticsearch-core-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.core.internal.io.IOUtils.close(IOUtils.java:86) ~[elasticsearch-core-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.common.network.CloseableChannel.closeChannels(CloseableChannel.java:89) ~[main/:?]
1> at org.elasticsearch.common.network.CloseableChannel.closeChannel(CloseableChannel.java:78) ~[main/:?]
2> WARNING: Uncaught exception in thread: Thread[elasticsearch[org.elasticsearch.transport.RemoteClusterConnectionTests][generic][T#3],5,TGRP-RemoteClusterConnectionTests]
2> java.lang.IllegalStateException: java.lang.InterruptedException
2> at __randomizedtesting.SeedInfo.seed([D538288F42486FCF]:0)
1> at org.elasticsearch.common.network.CloseableChannel.closeChannel(CloseableChannel.java:68) ~[main/:?]
2> at org.elasticsearch.transport.ConnectionManager.close(ConnectionManager.java:259)
2> at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:699)
1> at org.elasticsearch.transport.TcpTransport.onException(TcpTransport.java:591) ~[main/:?]
1> at org.elasticsearch.transport.nio.MockNioTransport.exceptionCaught(MockNioTransport.java:180) ~[framework-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.transport.nio.MockNioTransport$MockTcpChannelFactory.lambda$createChannel$2(MockNioTransport.java:214) ~[framework-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.ChannelContext.handleException(ChannelContext.java:99) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
2> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
1> at org.elasticsearch.nio.EventHandler.readException(EventHandler.java:129) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.transport.nio.TestEventHandler.readException(TestEventHandler.java:139) ~[framework-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.NioSelector.handleRead(NioSelector.java:411) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
2> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
1> at org.elasticsearch.nio.NioSelector.processKey(NioSelector.java:246) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.NioSelector.singleLoop(NioSelector.java:174) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
1> at org.elasticsearch.nio.NioSelector.runLoop(NioSelector.java:131) ~[elasticsearch-nio-8.0.0-SNAPSHOT.jar:8.0.0-SNAPSHOT]
2> at java.base/java.lang.Thread.run(Thread.java:834)
1> at java.lang.Thread.run(Thread.java:834) [?:?]
2> Caused by: java.lang.InterruptedException
It looks like we're failing to properly handle the connect handler failure here maybe and then just never release the ref?
original-brownbear
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @ywelsch on a another channel LGTM waiting for full close on the generic pool wasn't safe before either if the pool was already shut down anyway so this doesn't make things worse and fixes tests :)
We have seen test failures on Windows (#44339) in RemoteClusterConnectionTests with the following stack trace:
Even though I don't have Windows to reproduce the issue, I think it's not specific to Windows but rather a timing-related issue. The problem is that RemoteClusterConnection closes the connection manager asynchronously, which races with the threadpool being shutdown at the end of the test.
Closes #44339, #44610