Skip to content

Commit 0cedb9e

Browse files
authored
Update remote cluster stats to support simple mode (#49961)
Remote cluster stats API currently only returns useful information if the strategy in use is the SNIFF mode. This PR modifies the API to provide relevant information if the user is in the SIMPLE mode. This information is the configured addresses, max socket connections, and open socket connections.
1 parent 681e9fd commit 0cedb9e

File tree

12 files changed

+344
-117
lines changed

12 files changed

+344
-117
lines changed

docs/reference/ccr/getting-started.asciidoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ remote cluster.
135135
"num_nodes_connected" : 1, <2>
136136
"max_connections_per_cluster" : 3,
137137
"initial_connect_timeout" : "30s",
138-
"skip_unavailable" : false
138+
"skip_unavailable" : false,
139+
"mode" : "sniff"
139140
}
140141
}
141142
--------------------------------------------------
@@ -146,7 +147,7 @@ remote cluster.
146147
alias `leader`
147148
<2> This shows the number of nodes in the remote cluster the local cluster is
148149
connected to.
149-
150+
150151
Alternatively, you can manage remote clusters on the
151152
*Management / Elasticsearch / Remote Clusters* page in {kib}:
152153

qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
---
2-
"Fetch remote cluster info for existing cluster":
2+
"Fetch remote cluster sniff info for existing cluster":
33

44
- do:
55
cluster.remote_info: {}
66
- match: { my_remote_cluster.connected: true }
77
- match: { my_remote_cluster.num_nodes_connected: 1}
88
- match: { my_remote_cluster.max_connections_per_cluster: 1}
99
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
10+
- match: { my_remote_cluster.mode: "sniff" }
1011

1112
---
1213
"Add transient remote cluster based on the preset cluster and check remote info":
@@ -21,9 +22,13 @@
2122
flat_settings: true
2223
body:
2324
transient:
24-
cluster.remote.test_remote_cluster.seeds: $remote_ip
25+
cluster.remote.test_remote_cluster.mode: "sniff"
26+
cluster.remote.test_remote_cluster.sniff.node_connections: "2"
27+
cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip
2528

26-
- match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}}
29+
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"}
30+
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "2"}
31+
- match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip}
2732

2833
# we do another search here since this will enforce the connection to be established
2934
# otherwise the cluster might not have been connected yet.
@@ -45,19 +50,49 @@
4550
- match: { my_remote_cluster.seeds.0: $remote_ip }
4651

4752
- match: { my_remote_cluster.num_nodes_connected: 1}
48-
- match: { test_remote_cluster.num_nodes_connected: 1}
53+
- gt: { test_remote_cluster.num_nodes_connected: 0}
4954

5055
- match: { my_remote_cluster.max_connections_per_cluster: 1}
51-
- match: { test_remote_cluster.max_connections_per_cluster: 1}
56+
- match: { test_remote_cluster.max_connections_per_cluster: 2}
5257

5358
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
5459
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
5560

61+
- match: { my_remote_cluster.mode: "sniff" }
62+
- match: { test_remote_cluster.mode: "sniff" }
63+
64+
- do:
65+
cluster.put_settings:
66+
flat_settings: true
67+
body:
68+
transient:
69+
cluster.remote.test_remote_cluster.mode: "simple"
70+
cluster.remote.test_remote_cluster.sniff.seeds: null
71+
cluster.remote.test_remote_cluster.sniff.node_connections: null
72+
cluster.remote.test_remote_cluster.simple.socket_connections: "10"
73+
cluster.remote.test_remote_cluster.simple.addresses: $remote_ip
74+
75+
- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"}
76+
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "10"}
77+
- match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip}
78+
79+
- do:
80+
cluster.remote_info: {}
81+
82+
- match: { test_remote_cluster.connected: true }
83+
- match: { test_remote_cluster.addresses.0: $remote_ip }
84+
- gt: { test_remote_cluster.num_sockets_connected: 0}
85+
- match: { test_remote_cluster.max_socket_connections: 10}
86+
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
87+
- match: { test_remote_cluster.mode: "simple" }
88+
5689
- do:
5790
cluster.put_settings:
5891
body:
5992
transient:
60-
cluster.remote.test_remote_cluster.seeds: null
93+
cluster.remote.test_remote_cluster.mode: null
94+
cluster.remote.test_remote_cluster.simple.socket_connections: null
95+
cluster.remote.test_remote_cluster.simple.addresses: null
6196

6297
---
6398
"skip_unavailable is returned as part of _remote/info response":

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import java.io.Closeable;
3636
import java.io.IOException;
37-
import java.util.Collections;
3837
import java.util.function.Function;
3938

4039
/**
@@ -206,24 +205,7 @@ boolean isNodeConnected(final DiscoveryNode node) {
206205
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
207206
*/
208207
public RemoteConnectionInfo getConnectionInfo() {
209-
if (connectionStrategy instanceof SniffConnectionStrategy) {
210-
SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
211-
return new RemoteConnectionInfo(
212-
clusterAlias,
213-
sniffStrategy.getSeedNodes(),
214-
sniffStrategy.getMaxConnections(),
215-
getNumNodesConnected(),
216-
initialConnectionTimeout,
217-
skipUnavailable);
218-
} else {
219-
return new RemoteConnectionInfo(
220-
clusterAlias,
221-
Collections.emptyList(),
222-
0,
223-
getNumNodesConnected(),
224-
initialConnectionTimeout,
225-
skipUnavailable);
226-
}
208+
return new RemoteConnectionInfo(clusterAlias, connectionStrategy.getModeInfo(), initialConnectionTimeout, skipUnavailable);
227209
}
228210

229211
int getNumNodesConnected() {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.common.io.stream.Writeable;
@@ -36,63 +37,67 @@
3637
* {@code _remote/info} requests.
3738
*/
3839
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
39-
final List<String> seedNodes;
40-
final int connectionsPerCluster;
40+
41+
final ModeInfo modeInfo;
4142
final TimeValue initialConnectionTimeout;
42-
final int numNodesConnected;
4343
final String clusterAlias;
4444
final boolean skipUnavailable;
4545

46-
RemoteConnectionInfo(String clusterAlias, List<String> seedNodes,
47-
int connectionsPerCluster, int numNodesConnected,
48-
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
46+
RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) {
4947
this.clusterAlias = clusterAlias;
50-
this.seedNodes = seedNodes;
51-
this.connectionsPerCluster = connectionsPerCluster;
52-
this.numNodesConnected = numNodesConnected;
48+
this.modeInfo = modeInfo;
5349
this.initialConnectionTimeout = initialConnectionTimeout;
5450
this.skipUnavailable = skipUnavailable;
5551
}
5652

5753
public RemoteConnectionInfo(StreamInput input) throws IOException {
58-
seedNodes = Arrays.asList(input.readStringArray());
59-
connectionsPerCluster = input.readVInt();
60-
initialConnectionTimeout = input.readTimeValue();
61-
numNodesConnected = input.readVInt();
62-
clusterAlias = input.readString();
63-
skipUnavailable = input.readBoolean();
64-
}
65-
66-
public List<String> getSeedNodes() {
67-
return seedNodes;
68-
}
69-
70-
public int getConnectionsPerCluster() {
71-
return connectionsPerCluster;
72-
}
73-
74-
public TimeValue getInitialConnectionTimeout() {
75-
return initialConnectionTimeout;
54+
// TODO: Change to 7.6 after backport
55+
if (input.getVersion().onOrAfter(Version.V_8_0_0)) {
56+
RemoteConnectionStrategy.ConnectionStrategy mode = input.readEnum(RemoteConnectionStrategy.ConnectionStrategy.class);
57+
modeInfo = mode.getReader().read(input);
58+
initialConnectionTimeout = input.readTimeValue();
59+
clusterAlias = input.readString();
60+
skipUnavailable = input.readBoolean();
61+
} else {
62+
List<String> seedNodes = Arrays.asList(input.readStringArray());
63+
int connectionsPerCluster = input.readVInt();
64+
initialConnectionTimeout = input.readTimeValue();
65+
int numNodesConnected = input.readVInt();
66+
clusterAlias = input.readString();
67+
skipUnavailable = input.readBoolean();
68+
modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, connectionsPerCluster, numNodesConnected);
69+
}
7670
}
7771

78-
public int getNumNodesConnected() {
79-
return numNodesConnected;
72+
public boolean isConnected() {
73+
return modeInfo.isConnected();
8074
}
8175

8276
public String getClusterAlias() {
8377
return clusterAlias;
8478
}
8579

86-
public boolean isSkipUnavailable() {
87-
return skipUnavailable;
88-
}
89-
9080
@Override
9181
public void writeTo(StreamOutput out) throws IOException {
92-
out.writeStringArray(seedNodes.toArray(new String[0]));
93-
out.writeVInt(connectionsPerCluster);
94-
out.writeTimeValue(initialConnectionTimeout);
95-
out.writeVInt(numNodesConnected);
82+
// TODO: Change to 7.6 after backport
83+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
84+
out.writeEnum(modeInfo.modeType());
85+
modeInfo.writeTo(out);
86+
out.writeTimeValue(initialConnectionTimeout);
87+
} else {
88+
if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) {
89+
SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) this.modeInfo;
90+
out.writeStringArray(sniffInfo.seedNodes.toArray(new String[0]));
91+
out.writeVInt(sniffInfo.maxConnectionsPerCluster);
92+
out.writeTimeValue(initialConnectionTimeout);
93+
out.writeVInt(sniffInfo.numNodesConnected);
94+
} else {
95+
out.writeStringArray(new String[0]);
96+
out.writeVInt(0);
97+
out.writeTimeValue(initialConnectionTimeout);
98+
out.writeVInt(0);
99+
}
100+
}
96101
out.writeString(clusterAlias);
97102
out.writeBoolean(skipUnavailable);
98103
}
@@ -101,14 +106,9 @@ public void writeTo(StreamOutput out) throws IOException {
101106
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
102107
builder.startObject(clusterAlias);
103108
{
104-
builder.startArray("seeds");
105-
for (String addr : seedNodes) {
106-
builder.value(addr);
107-
}
108-
builder.endArray();
109-
builder.field("connected", numNodesConnected > 0);
110-
builder.field("num_nodes_connected", numNodesConnected);
111-
builder.field("max_connections_per_cluster", connectionsPerCluster);
109+
builder.field("connected", modeInfo.isConnected());
110+
builder.field("mode", modeInfo.modeName());
111+
modeInfo.toXContent(builder, params);
112112
builder.field("initial_connect_timeout", initialConnectionTimeout);
113113
builder.field("skip_unavailable", skipUnavailable);
114114
}
@@ -121,18 +121,23 @@ public boolean equals(Object o) {
121121
if (this == o) return true;
122122
if (o == null || getClass() != o.getClass()) return false;
123123
RemoteConnectionInfo that = (RemoteConnectionInfo) o;
124-
return connectionsPerCluster == that.connectionsPerCluster &&
125-
numNodesConnected == that.numNodesConnected &&
126-
Objects.equals(seedNodes, that.seedNodes) &&
124+
return skipUnavailable == that.skipUnavailable &&
125+
Objects.equals(modeInfo, that.modeInfo) &&
127126
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
128-
Objects.equals(clusterAlias, that.clusterAlias) &&
129-
skipUnavailable == that.skipUnavailable;
127+
Objects.equals(clusterAlias, that.clusterAlias);
130128
}
131129

132130
@Override
133131
public int hashCode() {
134-
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
135-
numNodesConnected, clusterAlias, skipUnavailable);
132+
return Objects.hash(modeInfo, initialConnectionTimeout, clusterAlias, skipUnavailable);
136133
}
137134

135+
public interface ModeInfo extends ToXContentFragment, Writeable {
136+
137+
boolean isConnected();
138+
139+
String modeName();
140+
141+
RemoteConnectionStrategy.ConnectionStrategy modeType();
142+
}
138143
}

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.ContextPreservingActionListener;
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
2929
import org.elasticsearch.common.collect.Tuple;
30+
import org.elasticsearch.common.io.stream.Writeable;
3031
import org.elasticsearch.common.settings.Setting;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.unit.TimeValue;
@@ -57,20 +58,39 @@
5758
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable {
5859

5960
enum ConnectionStrategy {
60-
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings),
61-
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings);
61+
SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings,
62+
SniffConnectionStrategy::infoReader) {
63+
@Override
64+
public String toString() {
65+
return "sniff";
66+
}
67+
},
68+
SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings,
69+
SimpleConnectionStrategy::infoReader) {
70+
@Override
71+
public String toString() {
72+
return "simple";
73+
}
74+
};
6275

6376
private final int numberOfChannels;
6477
private final Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings;
78+
private final Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader;
6579

66-
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings) {
80+
ConnectionStrategy(int numberOfChannels, Supplier<Stream<Setting.AffixSetting<?>>> enablementSettings,
81+
Supplier<Writeable.Reader<RemoteConnectionInfo.ModeInfo>> reader) {
6782
this.numberOfChannels = numberOfChannels;
6883
this.enablementSettings = enablementSettings;
84+
this.reader = reader;
6985
}
7086

7187
public int getNumberOfChannels() {
7288
return numberOfChannels;
7389
}
90+
91+
public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
92+
return reader.get();
93+
}
7494
}
7595

7696
public static final Setting.AffixSetting<ConnectionStrategy> REMOTE_CONNECTION_MODE = Setting.affixKeySetting(
@@ -310,6 +330,8 @@ boolean assertNoRunningConnections() {
310330

311331
protected abstract void connectImpl(ActionListener<Void> listener);
312332

333+
protected abstract RemoteConnectionInfo.ModeInfo getModeInfo();
334+
313335
private List<ActionListener<Void>> getAndClearListeners() {
314336
final List<ActionListener<Void>> result;
315337
synchronized (mutex) {

0 commit comments

Comments
 (0)