Skip to content

Commit 5573f83

Browse files
committed
Fix race condition in RemoteClusterConnection node supplier (#25432)
This commit fixes a race condition in the node supplier used by the RemoteClusterConnection. The node supplier stores an iterator over a set backed by a ConcurrentHashMap, but the get operation of the supplier uses multiple methods of the iterator and is suceptible to a race between the calls to hasNext() and next(). The test in this commit fails under the old implementation with a NoSuchElementException. This commit adds a wrapper object over a set and a iterator, with all methods being synchronized to avoid races. Modifications to the set result in the iterator being set to null and the next retrieval creates a new iterator.
1 parent c8094fd commit 5573f83

File tree

2 files changed

+170
-28
lines changed

2 files changed

+170
-28
lines changed

core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
3434
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
3535
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
36-
import org.elasticsearch.action.search.SearchRequest;
3736
import org.elasticsearch.cluster.node.DiscoveryNode;
3837
import org.elasticsearch.cluster.node.DiscoveryNodes;
3938
import org.elasticsearch.common.component.AbstractComponent;
@@ -56,15 +55,13 @@
5655
import java.util.Set;
5756
import java.util.concurrent.ArrayBlockingQueue;
5857
import java.util.concurrent.BlockingQueue;
59-
import java.util.concurrent.ConcurrentHashMap;
6058
import java.util.concurrent.ExecutorService;
6159
import java.util.concurrent.RejectedExecutionException;
6260
import java.util.concurrent.Semaphore;
6361
import java.util.concurrent.atomic.AtomicBoolean;
6462
import java.util.function.Function;
6563
import java.util.function.Predicate;
6664
import java.util.stream.Collectors;
67-
import java.util.stream.Stream;
6865

6966
/**
7067
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -83,8 +80,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8380

8481
private final TransportService transportService;
8582
private final ConnectionProfile remoteProfile;
86-
private final Set<DiscoveryNode> connectedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
87-
private final Supplier<DiscoveryNode> nodeSupplier;
83+
private final ConnectedNodes connectedNodes;
8884
private final String clusterAlias;
8985
private final int maxNumRemoteConnections;
9086
private final Predicate<DiscoveryNode> nodePredicate;
@@ -116,19 +112,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
116112
TransportRequestOptions.Type.STATE,
117113
TransportRequestOptions.Type.RECOVERY);
118114
remoteProfile = builder.build();
119-
nodeSupplier = new Supplier<DiscoveryNode>() {
120-
private volatile Iterator<DiscoveryNode> current;
121-
@Override
122-
public DiscoveryNode get() {
123-
if (current == null || current.hasNext() == false) {
124-
current = connectedNodes.iterator();
125-
if (current.hasNext() == false) {
126-
throw new IllegalStateException("No node available for cluster: " + clusterAlias + " nodes: " + connectedNodes);
127-
}
128-
}
129-
return current.next();
130-
}
131-
};
115+
connectedNodes = new ConnectedNodes(clusterAlias);
132116
this.seedNodes = Collections.unmodifiableList(seedNodes);
133117
this.connectHandler = new ConnectHandler();
134118
transportService.addConnectionListener(this);
@@ -156,7 +140,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
156140
*/
157141
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
158142
ActionListener<ClusterSearchShardsResponse> listener) {
159-
if (connectedNodes.isEmpty()) {
143+
if (connectedNodes.size() == 0) {
160144
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
161145
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
162146
// we can't proceed with a search on a cluster level.
@@ -173,7 +157,7 @@ public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
173157
* will invoke the listener immediately.
174158
*/
175159
public void ensureConnected(ActionListener<Void> voidActionListener) {
176-
if (connectedNodes.isEmpty()) {
160+
if (connectedNodes.size() == 0) {
177161
connectHandler.connect(voidActionListener);
178162
} else {
179163
voidActionListener.onResponse(null);
@@ -182,7 +166,7 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {
182166

183167
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
184168
final ActionListener<ClusterSearchShardsResponse> listener) {
185-
final DiscoveryNode node = nodeSupplier.get();
169+
final DiscoveryNode node = connectedNodes.get();
186170
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
187171
new TransportResponseHandler<ClusterSearchShardsResponse>() {
188172

@@ -218,7 +202,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
218202
request.clear();
219203
request.nodes(true);
220204
request.local(true); // run this on the node that gets the request it's as good as any other
221-
final DiscoveryNode node = nodeSupplier.get();
205+
final DiscoveryNode node = connectedNodes.get();
222206
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
223207
new TransportResponseHandler<ClusterStateResponse>() {
224208
@Override
@@ -243,7 +227,7 @@ public String executor() {
243227
}
244228
});
245229
};
246-
if (connectedNodes.isEmpty()) {
230+
if (connectedNodes.size() == 0) {
247231
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
248232
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
249233
// we can't proceed with a search on a cluster level.
@@ -260,7 +244,7 @@ public String executor() {
260244
* given node.
261245
*/
262246
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
263-
DiscoveryNode discoveryNode = nodeSupplier.get();
247+
DiscoveryNode discoveryNode = connectedNodes.get();
264248
Transport.Connection connection = transportService.getConnection(discoveryNode);
265249
return new Transport.Connection() {
266250
@Override
@@ -283,12 +267,11 @@ public void close() throws IOException {
283267
}
284268

285269
Transport.Connection getConnection() {
286-
DiscoveryNode discoveryNode = nodeSupplier.get();
270+
DiscoveryNode discoveryNode = connectedNodes.get();
287271
return transportService.getConnection(discoveryNode);
288272
}
289273

290-
291-
@Override
274+
@Override
292275
public void close() throws IOException {
293276
connectHandler.close();
294277
}
@@ -583,12 +566,19 @@ boolean isNodeConnected(final DiscoveryNode node) {
583566
return connectedNodes.contains(node);
584567
}
585568

569+
DiscoveryNode getConnectedNode() {
570+
return connectedNodes.get();
571+
}
572+
573+
void addConnectedNode(DiscoveryNode node) {
574+
connectedNodes.add(node);
575+
}
586576

587577
/**
588578
* Fetches connection info for this connection
589579
*/
590580
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
591-
final Optional<DiscoveryNode> anyNode = connectedNodes.stream().findAny();
581+
final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
592582
if (anyNode.isPresent() == false) {
593583
// not connected we return immediately
594584
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
@@ -650,4 +640,68 @@ public String executor() {
650640
int getNumNodesConnected() {
651641
return connectedNodes.size();
652642
}
643+
644+
private static class ConnectedNodes implements Supplier<DiscoveryNode> {
645+
646+
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
647+
private final String clusterAlias;
648+
649+
private Iterator<DiscoveryNode> currentIterator = null;
650+
651+
private ConnectedNodes(String clusterAlias) {
652+
this.clusterAlias = clusterAlias;
653+
}
654+
655+
@Override
656+
public synchronized DiscoveryNode get() {
657+
ensureIteratorAvailable();
658+
if (currentIterator.hasNext()) {
659+
return currentIterator.next();
660+
} else {
661+
throw new IllegalStateException("No node available for cluster: " + clusterAlias);
662+
}
663+
}
664+
665+
synchronized boolean remove(DiscoveryNode node) {
666+
final boolean setRemoval = nodeSet.remove(node);
667+
if (setRemoval) {
668+
currentIterator = null;
669+
}
670+
return setRemoval;
671+
}
672+
673+
synchronized boolean add(DiscoveryNode node) {
674+
final boolean added = nodeSet.add(node);
675+
if (added) {
676+
currentIterator = null;
677+
}
678+
return added;
679+
}
680+
681+
synchronized int size() {
682+
return nodeSet.size();
683+
}
684+
685+
synchronized boolean contains(DiscoveryNode node) {
686+
return nodeSet.contains(node);
687+
}
688+
689+
synchronized Optional<DiscoveryNode> getAny() {
690+
ensureIteratorAvailable();
691+
if (currentIterator.hasNext()) {
692+
return Optional.of(currentIterator.next());
693+
} else {
694+
return Optional.empty();
695+
}
696+
}
697+
698+
private synchronized void ensureIteratorAvailable() {
699+
if (currentIterator == null) {
700+
currentIterator = nodeSet.iterator();
701+
} else if (currentIterator.hasNext() == false && nodeSet.isEmpty() == false) {
702+
// iterator rollover
703+
currentIterator = nodeSet.iterator();
704+
}
705+
}
706+
}
653707
}

core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.elasticsearch.transport;
2020

2121
import org.apache.lucene.store.AlreadyClosedException;
22+
import org.apache.lucene.util.IOUtils;
2223
import org.elasticsearch.Build;
2324
import org.elasticsearch.Version;
2425
import org.elasticsearch.action.ActionListener;
@@ -73,6 +74,7 @@
7374
import java.util.concurrent.RejectedExecutionException;
7475
import java.util.concurrent.TimeUnit;
7576
import java.util.concurrent.atomic.AtomicBoolean;
77+
import java.util.concurrent.atomic.AtomicInteger;
7678
import java.util.concurrent.atomic.AtomicReference;
7779
import java.util.function.Function;
7880

@@ -820,4 +822,90 @@ public void testCollectNodes() throws Exception {
820822
}
821823
}
822824
}
825+
826+
public void testConnectedNodesConcurrentAccess() throws IOException, InterruptedException {
827+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
828+
List<MockTransportService> discoverableTransports = new CopyOnWriteArrayList<>();
829+
try {
830+
final int numDiscoverableNodes = randomIntBetween(5, 20);
831+
List<DiscoveryNode> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
832+
for (int i = 0; i < numDiscoverableNodes; i++ ) {
833+
MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT);
834+
discoverableNodes.add(transportService.getLocalDiscoNode());
835+
discoverableTransports.add(transportService);
836+
}
837+
838+
List<DiscoveryNode> seedNodes = randomSubsetOf(discoverableNodes);
839+
Collections.shuffle(seedNodes, random());
840+
841+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
842+
service.start();
843+
service.acceptIncomingRequests();
844+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
845+
seedNodes, service, Integer.MAX_VALUE, n -> true)) {
846+
final int numGetThreads = randomIntBetween(4, 10);
847+
final Thread[] getThreads = new Thread[numGetThreads];
848+
final int numModifyingThreads = randomIntBetween(4, 10);
849+
final Thread[] modifyingThreads = new Thread[numModifyingThreads];
850+
CyclicBarrier barrier = new CyclicBarrier(numGetThreads + numModifyingThreads);
851+
for (int i = 0; i < getThreads.length; i++) {
852+
final int numGetCalls = randomIntBetween(1000, 10000);
853+
getThreads[i] = new Thread(() -> {
854+
try {
855+
barrier.await();
856+
for (int j = 0; j < numGetCalls; j++) {
857+
try {
858+
DiscoveryNode node = connection.getConnectedNode();
859+
assertNotNull(node);
860+
} catch (IllegalStateException e) {
861+
if (e.getMessage().startsWith("No node available for cluster:") == false) {
862+
throw e;
863+
}
864+
}
865+
}
866+
} catch (Exception ex) {
867+
throw new AssertionError(ex);
868+
}
869+
});
870+
getThreads[i].start();
871+
}
872+
873+
final AtomicInteger counter = new AtomicInteger();
874+
for (int i = 0; i < modifyingThreads.length; i++) {
875+
final int numDisconnects = randomIntBetween(5, 10);
876+
modifyingThreads[i] = new Thread(() -> {
877+
try {
878+
barrier.await();
879+
for (int j = 0; j < numDisconnects; j++) {
880+
if (randomBoolean()) {
881+
MockTransportService transportService =
882+
startTransport("discoverable_node_added" + counter.incrementAndGet(), knownNodes,
883+
Version.CURRENT);
884+
discoverableTransports.add(transportService);
885+
connection.addConnectedNode(transportService.getLocalDiscoNode());
886+
} else {
887+
DiscoveryNode node = randomFrom(discoverableNodes);
888+
connection.onNodeDisconnected(node);
889+
}
890+
}
891+
} catch (Exception ex) {
892+
throw new AssertionError(ex);
893+
}
894+
});
895+
modifyingThreads[i].start();
896+
}
897+
898+
for (Thread thread : getThreads) {
899+
thread.join();
900+
}
901+
for (Thread thread : modifyingThreads) {
902+
thread.join();
903+
}
904+
}
905+
}
906+
} finally {
907+
IOUtils.closeWhileHandlingException(discoverableTransports);
908+
}
909+
}
910+
823911
}

0 commit comments

Comments
 (0)