Skip to content

Commit 1a573d2

Browse files
committed
Add remote cluster infrastructure to fetch discovery nodes. (#25123)
In order to add scroll support for cross cluster search we need to resolve the nodes encoded in the scroll ID to send requests to the corresponding nodes. This change adds the low level connection infrastructure that also ensures that connections are re-established if the cluster is disconnected due to a network failure or restarts. Relates to #25094
1 parent fb19633 commit 1a573d2

File tree

4 files changed

+281
-6
lines changed

4 files changed

+281
-6
lines changed

core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@
6161
import java.util.concurrent.RejectedExecutionException;
6262
import java.util.concurrent.Semaphore;
6363
import java.util.concurrent.atomic.AtomicBoolean;
64+
import java.util.function.Function;
6465
import java.util.function.Predicate;
6566
import java.util.stream.Collectors;
67+
import java.util.stream.Stream;
6668

6769
/**
6870
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
@@ -206,6 +208,53 @@ public String executor() {
206208
});
207209
}
208210

211+
/**
212+
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
213+
* that returns <code>null</code> if the node ID is not found.
214+
*/
215+
void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
216+
Runnable runnable = () -> {
217+
final ClusterStateRequest request = new ClusterStateRequest();
218+
request.clear();
219+
request.nodes(true);
220+
request.local(true); // run this on the node that gets the request it's as good as any other
221+
final DiscoveryNode node = nodeSupplier.get();
222+
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
223+
new TransportResponseHandler<ClusterStateResponse>() {
224+
@Override
225+
public ClusterStateResponse newInstance() {
226+
return new ClusterStateResponse();
227+
}
228+
229+
@Override
230+
public void handleResponse(ClusterStateResponse response) {
231+
DiscoveryNodes nodes = response.getState().nodes();
232+
listener.onResponse(nodes::get);
233+
}
234+
235+
@Override
236+
public void handleException(TransportException exp) {
237+
listener.onFailure(exp);
238+
}
239+
240+
@Override
241+
public String executor() {
242+
return ThreadPool.Names.SAME;
243+
}
244+
});
245+
};
246+
if (connectedNodes.isEmpty()) {
247+
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
248+
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
249+
// we can't proceed with a search on a cluster level.
250+
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the
251+
// caller end since they provide the listener.
252+
ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure));
253+
} else {
254+
runnable.run();
255+
}
256+
}
257+
209258
/**
210259
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
211260
* given node.

core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.util.concurrent.TimeUnit;
5252
import java.util.concurrent.TimeoutException;
5353
import java.util.concurrent.atomic.AtomicReference;
54+
import java.util.function.BiFunction;
55+
import java.util.function.Function;
5456
import java.util.function.Predicate;
5557
import java.util.stream.Collectors;
5658

@@ -346,4 +348,44 @@ public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionI
346348
}
347349
}
348350
}
351+
352+
/**
353+
* Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
354+
* function on success.
355+
*/
356+
public void collectNodes(Set<String> clusters, ActionListener<BiFunction<String, String, DiscoveryNode>> listener) {
357+
Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
358+
for (String cluster : clusters) {
359+
if (remoteClusters.containsKey(cluster) == false) {
360+
listener.onFailure(new IllegalArgumentException("no such remote cluster: [" + cluster + "]"));
361+
return;
362+
}
363+
}
364+
365+
final Map<String, Function<String, DiscoveryNode>> clusterMap = new HashMap<>();
366+
CountDown countDown = new CountDown(clusters.size());
367+
Function<String, DiscoveryNode> nullFunction = s -> null;
368+
for (final String cluster : clusters) {
369+
RemoteClusterConnection connection = remoteClusters.get(cluster);
370+
connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>() {
371+
@Override
372+
public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
373+
synchronized (clusterMap) {
374+
clusterMap.put(cluster, nodeLookup);
375+
}
376+
if (countDown.countDown()) {
377+
listener.onResponse((clusterAlias, nodeId)
378+
-> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
379+
}
380+
}
381+
382+
@Override
383+
public void onFailure(Exception e) {
384+
if (countDown.fastForward()) { // we need to check if it's true since we could have multiple failures
385+
listener.onFailure(e);
386+
}
387+
}
388+
});
389+
}
390+
}
349391
}

core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@
5656
import org.elasticsearch.test.transport.MockTransportService;
5757
import org.elasticsearch.threadpool.TestThreadPool;
5858
import org.elasticsearch.threadpool.ThreadPool;
59-
import org.elasticsearch.transport.RemoteClusterConnection;
60-
import org.elasticsearch.transport.RemoteConnectionInfo;
61-
import org.elasticsearch.transport.RemoteTransportException;
62-
import org.elasticsearch.transport.TransportConnectionListener;
63-
import org.elasticsearch.transport.TransportService;
6459

6560
import java.io.IOException;
6661
import java.net.InetAddress;
@@ -79,6 +74,7 @@
7974
import java.util.concurrent.TimeUnit;
8075
import java.util.concurrent.atomic.AtomicBoolean;
8176
import java.util.concurrent.atomic.AtomicReference;
77+
import java.util.function.Function;
8278

8379
import static java.util.Collections.emptyMap;
8480
import static java.util.Collections.emptySet;
@@ -358,7 +354,6 @@ public void run() {
358354

359355
public void testFetchShards() throws Exception {
360356
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
361-
362357
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
363358
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
364359
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
@@ -787,4 +782,42 @@ public void onFailure(Exception e) {
787782
}
788783
}
789784
}
785+
786+
public void testCollectNodes() throws Exception {
787+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
788+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
789+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
790+
knownNodes.add(seedTransport.getLocalDiscoNode());
791+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
792+
service.start();
793+
service.acceptIncomingRequests();
794+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
795+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
796+
if (randomBoolean()) {
797+
updateSeedNodes(connection, Arrays.asList(seedNode));
798+
}
799+
CountDownLatch responseLatch = new CountDownLatch(1);
800+
AtomicReference<Function<String, DiscoveryNode>> reference = new AtomicReference<>();
801+
AtomicReference<Exception> failReference = new AtomicReference<>();
802+
ActionListener<Function<String, DiscoveryNode>> shardsListener = ActionListener.wrap(
803+
x -> {
804+
reference.set(x);
805+
responseLatch.countDown();
806+
},
807+
x -> {
808+
failReference.set(x);
809+
responseLatch.countDown();
810+
});
811+
connection.collectNodes(shardsListener);
812+
responseLatch.await();
813+
assertNull(failReference.get());
814+
assertNotNull(reference.get());
815+
Function<String, DiscoveryNode> function = reference.get();
816+
assertEquals(seedNode, function.apply(seedNode.getId()));
817+
assertNull(function.apply(seedNode.getId() + "foo"));
818+
assertTrue(connection.assertNoRunningConnections());
819+
}
820+
}
821+
}
822+
}
790823
}

core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.apache.lucene.util.IOUtils;
2122
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -34,11 +35,14 @@
3435
import java.net.InetSocketAddress;
3536
import java.util.Arrays;
3637
import java.util.Collections;
38+
import java.util.HashSet;
3739
import java.util.List;
3840
import java.util.Map;
3941
import java.util.concurrent.CopyOnWriteArrayList;
4042
import java.util.concurrent.CountDownLatch;
4143
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicReference;
45+
import java.util.function.BiFunction;
4246

4347
public class RemoteClusterServiceTests extends ESTestCase {
4448

@@ -303,4 +307,151 @@ private ActionListener<Void> connectionListener(final CountDownLatch latch) {
303307
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
304308
}
305309

310+
311+
public void testCollectNodes() throws InterruptedException, IOException {
312+
final Settings settings = Settings.EMPTY;
313+
final List<DiscoveryNode> knownNodes_c1 = new CopyOnWriteArrayList<>();
314+
final List<DiscoveryNode> knownNodes_c2 = new CopyOnWriteArrayList<>();
315+
316+
try (MockTransportService c1N1 =
317+
startTransport("cluster_1_node_1", knownNodes_c1, Version.CURRENT);
318+
MockTransportService c1N2 =
319+
startTransport("cluster_1_node_2", knownNodes_c1, Version.CURRENT);
320+
MockTransportService c2N1 =
321+
startTransport("cluster_2_node_1", knownNodes_c2, Version.CURRENT);
322+
MockTransportService c2N2 =
323+
startTransport("cluster_2_node_2", knownNodes_c2, Version.CURRENT)) {
324+
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
325+
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
326+
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
327+
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
328+
knownNodes_c1.add(c1N1Node);
329+
knownNodes_c1.add(c1N2Node);
330+
knownNodes_c2.add(c2N1Node);
331+
knownNodes_c2.add(c2N2Node);
332+
Collections.shuffle(knownNodes_c1, random());
333+
Collections.shuffle(knownNodes_c2, random());
334+
335+
try (MockTransportService transportService = MockTransportService.createNewService(
336+
settings,
337+
Version.CURRENT,
338+
threadPool,
339+
null)) {
340+
transportService.start();
341+
transportService.acceptIncomingRequests();
342+
final Settings.Builder builder = Settings.builder();
343+
builder.putArray(
344+
"search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
345+
builder.putArray(
346+
"search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
347+
try (RemoteClusterService service =
348+
new RemoteClusterService(settings, transportService)) {
349+
assertFalse(service.isCrossClusterSearchEnabled());
350+
service.initializeRemoteClusters();
351+
assertFalse(service.isCrossClusterSearchEnabled());
352+
353+
final InetSocketAddress c1N1Address = ((InetSocketTransportAddress)c1N1Node.getAddress()).address();
354+
final InetSocketAddress c1N2Address = ((InetSocketTransportAddress)c1N2Node.getAddress()).address();
355+
final InetSocketAddress c2N1Address = ((InetSocketTransportAddress)c2N1Node.getAddress()).address();
356+
final InetSocketAddress c2N2Address = ((InetSocketTransportAddress)c2N2Node.getAddress()).address();
357+
358+
final CountDownLatch firstLatch = new CountDownLatch(1);
359+
service.updateRemoteCluster(
360+
"cluster_1",
361+
Arrays.asList(c1N1Address, c1N2Address),
362+
connectionListener(firstLatch));
363+
firstLatch.await();
364+
365+
final CountDownLatch secondLatch = new CountDownLatch(1);
366+
service.updateRemoteCluster(
367+
"cluster_2",
368+
Arrays.asList(c2N1Address, c2N2Address),
369+
connectionListener(secondLatch));
370+
secondLatch.await();
371+
CountDownLatch latch = new CountDownLatch(1);
372+
service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")),
373+
new ActionListener<BiFunction<String, String, DiscoveryNode>>() {
374+
@Override
375+
public void onResponse(BiFunction<String, String, DiscoveryNode> func) {
376+
try {
377+
assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId()));
378+
assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId()));
379+
assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId()));
380+
assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId()));
381+
} finally {
382+
latch.countDown();
383+
}
384+
}
385+
386+
@Override
387+
public void onFailure(Exception e) {
388+
try {
389+
throw new AssertionError(e);
390+
} finally {
391+
latch.countDown();
392+
}
393+
}
394+
});
395+
latch.await();
396+
{
397+
CountDownLatch failLatch = new CountDownLatch(1);
398+
AtomicReference<Exception> ex = new AtomicReference<>();
399+
service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")),
400+
new ActionListener<BiFunction<String, String, DiscoveryNode>>() {
401+
@Override
402+
public void onResponse(BiFunction<String, String, DiscoveryNode> stringStringDiscoveryNodeBiFunction) {
403+
try {
404+
fail("should not be called");
405+
} finally {
406+
failLatch.countDown();
407+
}
408+
}
409+
410+
@Override
411+
public void onFailure(Exception e) {
412+
try {
413+
ex.set(e);
414+
} finally {
415+
failLatch.countDown();
416+
}
417+
}
418+
});
419+
failLatch.await();
420+
assertNotNull(ex.get());
421+
assertTrue(ex.get() instanceof IllegalArgumentException);
422+
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
423+
}
424+
{
425+
// close all targets and check for the transport level failure path
426+
IOUtils.close(c1N1, c1N2, c2N1, c2N2);
427+
CountDownLatch failLatch = new CountDownLatch(1);
428+
AtomicReference<Exception> ex = new AtomicReference<>();
429+
service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")),
430+
new ActionListener<BiFunction<String, String, DiscoveryNode>>() {
431+
@Override
432+
public void onResponse(BiFunction<String, String, DiscoveryNode> stringStringDiscoveryNodeBiFunction) {
433+
try {
434+
fail("should not be called");
435+
} finally {
436+
failLatch.countDown();
437+
}
438+
}
439+
440+
@Override
441+
public void onFailure(Exception e) {
442+
try {
443+
ex.set(e);
444+
} finally {
445+
failLatch.countDown();
446+
}
447+
}
448+
});
449+
failLatch.await();
450+
assertNotNull(ex.get());
451+
assertTrue(ex.get().getClass().toString(), ex.get() instanceof TransportException);
452+
}
453+
}
454+
}
455+
}
456+
}
306457
}

0 commit comments

Comments
 (0)