Skip to content

Commit 44df762

Browse files
authored
Rebuild remote connections on profile changes (#39146)
Currently remote compression and ping schedule settings are dynamic. However, we do not listen for changes. This commit adds listeners for changes to those two settings. Additionally, when those settings change we now close existing connections and open new ones with the settings applied. Fixes #37201.
1 parent 34d0647 commit 44df762

File tree

16 files changed

+342
-133
lines changed

16 files changed

+342
-133
lines changed

docs/reference/modules/remote-clusters.asciidoc

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,39 +27,39 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster.
2727

2828
[float]
2929
[[configuring-remote-clusters]]
30-
=== Configuring Remote Clusters
30+
=== Configuring remote clusters
3131

32-
Remote clusters can be specified globally using
33-
<<cluster-update-settings,cluster settings>> (which can be updated dynamically),
34-
or local to individual nodes using the `elasticsearch.yml` file.
32+
You can configure remote clusters globally by using
33+
<<cluster-update-settings,cluster settings>>, which you can update dynamically.
34+
Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file.
3535

36-
If a remote cluster is configured via `elasticsearch.yml` only the nodes with
37-
that configuration will be able to connect to the remote cluster. In other
38-
words, functionality that relies on remote cluster requests will have to be
39-
driven specifically from those nodes. Remote clusters set via the
40-
<<cluster-update-settings,cluster settings API>> will be available on every node
41-
in the cluster.
42-
43-
The `elasticsearch.yml` config file for a node that connects to remote clusters
44-
needs to list the remote clusters that should be connected to, for instance:
36+
If you specify the settings in `elasticsearch.yml` files, only the nodes with
37+
those settings can connect to the remote cluster. In other words, functionality
38+
that relies on remote cluster requests must be driven specifically from those
39+
nodes. For example:
4540

4641
[source,yaml]
4742
--------------------------------
4843
cluster:
4944
remote:
5045
cluster_one: <1>
5146
seeds: 127.0.0.1:9300
52-
cluster_two: <1>
47+
transport.ping_schedule: 30s <2>
48+
cluster_two:
5349
seeds: 127.0.0.1:9301
50+
transport.compress: true <3>
5451
5552
--------------------------------
5653
<1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing
5754
the connection to each cluster. These names are subsequently used to distinguish
5855
between local and remote indices.
56+
<2> A keep-alive ping is configured for `cluster_one`.
57+
<3> Compression is explicitly enabled for requests to `cluster_two`.
58+
59+
For more information about the optional transport settings, see
60+
<<modules-transport>>.
5961

60-
The equivalent example using the <<cluster-update-settings,cluster settings
61-
API>> to add remote clusters to all nodes in the cluster would look like the
62-
following:
62+
If you use <<cluster-update-settings,cluster settings>>, the remote clusters are available on every node in the cluster. For example:
6363

6464
[source,js]
6565
--------------------------------
@@ -71,12 +71,14 @@ PUT _cluster/settings
7171
"cluster_one": {
7272
"seeds": [
7373
"127.0.0.1:9300"
74-
]
74+
],
75+
"transport.ping_schedule": "30s"
7576
},
7677
"cluster_two": {
7778
"seeds": [
7879
"127.0.0.1:9301"
79-
]
80+
],
81+
"transport.compress": true
8082
},
8183
"cluster_three": {
8284
"seeds": [
@@ -92,6 +94,40 @@ PUT _cluster/settings
9294
// TEST[setup:host]
9395
// TEST[s/127.0.0.1:9300/\${transport_host}/]
9496

97+
You can dynamically update the compression and ping schedule settings. However,
98+
you must re-include seeds in the settings update request. For example:
99+
100+
[source,js]
101+
--------------------------------
102+
PUT _cluster/settings
103+
{
104+
"persistent": {
105+
"cluster": {
106+
"remote": {
107+
"cluster_one": {
108+
"seeds": [
109+
"127.0.0.1:9300"
110+
],
111+
"transport.ping_schedule": "60s"
112+
},
113+
"cluster_two": {
114+
"seeds": [
115+
"127.0.0.1:9301"
116+
],
117+
"transport.compress": false
118+
}
119+
}
120+
}
121+
}
122+
}
123+
--------------------------------
124+
// CONSOLE
125+
// TEST[continued]
126+
127+
NOTE: When the compression or ping schedule settings change, all the existing
128+
node connections must close and re-open, which can cause in-flight requests to
129+
fail.
130+
95131
A remote cluster can be deleted from the cluster settings by setting its seeds
96132
to `null`:
97133

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@
3030
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3131
import org.elasticsearch.common.util.concurrent.KeyedLock;
3232
import org.elasticsearch.core.internal.io.IOUtils;
33-
import org.elasticsearch.threadpool.ThreadPool;
3433

3534
import java.io.Closeable;
3635
import java.io.IOException;
3736
import java.util.Iterator;
3837
import java.util.Map;
3938
import java.util.concurrent.ConcurrentMap;
4039
import java.util.concurrent.CopyOnWriteArrayList;
41-
import java.util.concurrent.CountDownLatch;
42-
import java.util.concurrent.TimeUnit;
4340
import java.util.concurrent.atomic.AtomicBoolean;
4441
import java.util.concurrent.locks.ReadWriteLock;
4542
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable {
5653
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
5754
private final KeyedLock<String> connectionLock = new KeyedLock<>();
5855
private final Transport transport;
59-
private final ThreadPool threadPool;
6056
private final ConnectionProfile defaultProfile;
6157
private final AtomicBoolean isClosed = new AtomicBoolean(false);
6258
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
6359
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
6460

65-
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
66-
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool);
61+
public ConnectionManager(Settings settings, Transport transport) {
62+
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
6763
}
6864

69-
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) {
65+
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
7066
this.transport = transport;
71-
this.threadPool = threadPool;
7267
this.defaultProfile = connectionProfile;
7368
}
7469

@@ -185,35 +180,23 @@ public int size() {
185180

186181
@Override
187182
public void close() {
183+
Transports.assertNotTransportThread("Closing ConnectionManager");
188184
if (isClosed.compareAndSet(false, true)) {
189-
CountDownLatch latch = new CountDownLatch(1);
190-
191-
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
192-
threadPool.generic().execute(() -> {
193-
closeLock.writeLock().lock();
194-
try {
195-
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
196-
// all instances and then clear them maps
197-
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
198-
while (iterator.hasNext()) {
199-
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
200-
try {
201-
IOUtils.closeWhileHandlingException(next.getValue());
202-
} finally {
203-
iterator.remove();
204-
}
185+
closeLock.writeLock().lock();
186+
try {
187+
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
188+
// all instances and then clear them maps
189+
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
190+
while (iterator.hasNext()) {
191+
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
192+
try {
193+
IOUtils.closeWhileHandlingException(next.getValue());
194+
} finally {
195+
iterator.remove();
205196
}
206-
} finally {
207-
closeLock.writeLock().unlock();
208-
latch.countDown();
209197
}
210-
});
211-
212-
try {
213-
latch.await(30, TimeUnit.SECONDS);
214-
} catch (InterruptedException e) {
215-
Thread.currentThread().interrupt();
216-
// ignore
198+
} finally {
199+
closeLock.writeLock().unlock();
217200
}
218201
}
219202
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.settings.SettingUpgrader;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.transport.TransportAddress;
33+
import org.elasticsearch.common.unit.TimeValue;
3334

3435
import java.net.InetAddress;
3536
import java.net.InetSocketAddress;
@@ -282,21 +283,38 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
282283
return perClusterIndices;
283284
}
284285

286+
void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
287+
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
288+
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
289+
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
290+
}
291+
292+
void updateRemoteCluster(String clusterAlias, Settings settings) {
293+
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
294+
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
295+
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
296+
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
297+
.getConcreteSettingForNamespace(clusterAlias)
298+
.get(settings);
299+
300+
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
301+
}
302+
285303
/**
286304
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
287305
* empty the cluster alias is unregistered and should be removed.
288306
*/
289-
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
307+
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
308+
TimeValue pingSchedule);
290309

291310
/**
292311
* Registers this instance to listen to updates on the cluster settings.
293312
*/
294313
public void listenForUpdates(ClusterSettings clusterSettings) {
295-
clusterSettings.addAffixUpdateConsumer(
296-
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
297-
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
298-
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
299-
(namespace, value) -> {});
314+
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
315+
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
316+
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
317+
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
300318
clusterSettings.addAffixUpdateConsumer(
301319
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
302320
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@
6464
import java.util.function.Supplier;
6565
import java.util.stream.Collectors;
6666

67-
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
68-
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE;
69-
7067
/**
7168
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
7269
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
@@ -107,12 +104,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
107104
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
108105
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
109106
* @param proxyAddress the proxy address
107+
* @param connectionProfile the connection profile to use
110108
*/
111109
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
112110
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
113-
String proxyAddress) {
111+
String proxyAddress, ConnectionProfile connectionProfile) {
114112
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
115-
createConnectionManager(settings, clusterAlias, transportService));
113+
createConnectionManager(connectionProfile, transportService));
116114
}
117115

118116
// Public for tests to pass a StubbableConnectionManager
@@ -309,13 +307,23 @@ Transport.Connection getConnection() {
309307

310308
@Override
311309
public void close() throws IOException {
312-
IOUtils.close(connectHandler, connectionManager);
310+
IOUtils.close(connectHandler);
311+
// In the ConnectionManager we wait on connections being closed.
312+
threadPool.generic().execute(connectionManager::close);
313313
}
314314

315315
public boolean isClosed() {
316316
return connectHandler.isClosed();
317317
}
318318

319+
public String getProxyAddress() {
320+
return proxyAddress;
321+
}
322+
323+
public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
324+
return seedNodes;
325+
}
326+
319327
/**
320328
* The connect handler manages node discovery and the actual connect to the remote cluster.
321329
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -697,18 +705,8 @@ private synchronized void ensureIteratorAvailable() {
697705
}
698706
}
699707

700-
private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) {
701-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder()
702-
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
703-
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
704-
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
705-
// we don't want this to be used for anything else but search
706-
.addConnections(0, TransportRequestOptions.Type.BULK,
707-
TransportRequestOptions.Type.STATE,
708-
TransportRequestOptions.Type.RECOVERY)
709-
.setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
710-
.setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings));
711-
return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool);
708+
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
709+
return new ConnectionManager(connectionProfile, transportService.transport);
712710
}
713711

714712
ConnectionManager getConnectionManager() {

0 commit comments

Comments
 (0)