Skip to content

Commit e6f9f06

Browse files
NETWORKING: MockTransportService Wait for Close (#35038)
* NETWORKING: MockTransportService Wait for Close * Make `MockTransportService` wait `30s` for close listeners to run before failing the assertion * Closes #34990
1 parent 0072c90 commit e6f9f06

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void close() {
202202
threadPool.generic().execute(() -> {
203203
closeLock.writeLock().lock();
204204
try {
205-
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
205+
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
206206
// all instances and then clear them maps
207207
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
208208
while (iterator.hasNext()) {

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.test.transport;
2121

2222
import com.carrotsearch.randomizedtesting.SysGlobals;
23+
import java.util.concurrent.TimeUnit;
2324
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.cluster.ClusterModule;
@@ -599,30 +600,37 @@ public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile
599600
Transport.Connection connection = super.openConnection(node, profile);
600601

601602
synchronized (openConnections) {
602-
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
603-
(n) -> new CopyOnWriteArrayList<>());
604-
connections.add(connection);
605-
}
606-
607-
connection.addCloseListener(ActionListener.wrap(() -> {
608-
synchronized (openConnections) {
609-
List<Transport.Connection> connections = openConnections.get(node);
610-
boolean remove = connections.remove(connection);
611-
assert remove : "Should have removed connection";
612-
if (connections.isEmpty()) {
613-
openConnections.remove(node);
603+
openConnections.computeIfAbsent(node, n -> new CopyOnWriteArrayList<>()).add(connection);
604+
connection.addCloseListener(ActionListener.wrap(() -> {
605+
synchronized (openConnections) {
606+
List<Transport.Connection> connections = openConnections.get(node);
607+
boolean remove = connections.remove(connection);
608+
assert remove : "Should have removed connection";
609+
if (connections.isEmpty()) {
610+
openConnections.remove(node);
611+
}
612+
if (openConnections.isEmpty()) {
613+
openConnections.notifyAll();
614+
}
614615
}
615-
}
616-
}));
616+
}));
617+
}
617618

618619
return connection;
619620
}
620621

621622
@Override
622623
protected void doClose() throws IOException {
623624
super.doClose();
624-
synchronized (openConnections) {
625-
assert openConnections.size() == 0 : "still open connections: " + openConnections;
625+
try {
626+
synchronized (openConnections) {
627+
if (openConnections.isEmpty() == false) {
628+
openConnections.wait(TimeUnit.SECONDS.toMillis(30L));
629+
}
630+
assert openConnections.size() == 0 : "still open connections: " + openConnections;
631+
}
632+
} catch (InterruptedException e) {
633+
throw new IllegalStateException(e);
626634
}
627635
}
628636

0 commit comments

Comments
 (0)