Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.node_connections: "5"
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
Expand All @@ -29,7 +29,7 @@
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: $remote_ip
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
Expand Down Expand Up @@ -64,12 +64,12 @@
flat_settings: true
body:
transient:
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip
cluster.remote.test_remote_cluster.seeds: $remote_ip

- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_addresses\" cannot be
- match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be
used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" }

---
Expand All @@ -87,11 +87,11 @@
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_socket_connections: "3"
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}

- do:
search:
Expand Down Expand Up @@ -179,7 +179,7 @@
body:
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
Expand All @@ -193,10 +193,10 @@
transient:
cluster.remote.test_remote_cluster.mode: "proxy"
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}

- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@
cluster.remote.test_remote_cluster.seeds: null
cluster.remote.test_remote_cluster.node_connections: null
cluster.remote.test_remote_cluster.proxy_socket_connections: "10"
cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip
cluster.remote.test_remote_cluster.proxy_address: $remote_ip

- match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "10"}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip}
- match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip}

- do:
cluster.remote_info: {}

- match: { test_remote_cluster.connected: true }
- match: { test_remote_cluster.addresses.0: $remote_ip }
- match: { test_remote_cluster.address: $remote_ip }
- gt: { test_remote_cluster.num_sockets_connected: 0}
- match: { test_remote_cluster.max_socket_connections: 10}
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
Expand All @@ -92,7 +92,7 @@
transient:
cluster.remote.test_remote_cluster.mode: null
cluster.remote.test_remote_cluster.proxy_socket_connections: null
cluster.remote.test_remote_cluster.proxy_addresses: null
cluster.remote.test_remote_cluster.proxy_address: null

---
"skip_unavailable is returned as part of _remote/info response":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -37,18 +38,14 @@
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;
Expand All @@ -57,18 +54,16 @@
public class ProxyConnectionStrategy extends RemoteConnectionStrategy {

/**
* A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin
* fashion.
* The remote address for the proxy. The connections will be opened to the configured address.
*/
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting(
"cluster.remote.",
"proxy_addresses",
(ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> {
// validate address
parsePort(s);
return s;
}, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY),
Setting.Property.Dynamic, Setting.Property.NodeScope));
"proxy_address",
(ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> {
if (Strings.hasLength(s)) {
parsePort(s);
}
}), Setting.Property.Dynamic, Setting.Property.NodeScope));

/**
* The maximum number of socket connections that will be established to a remote cluster. The default is 18.
Expand All @@ -95,9 +90,9 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {

private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final String configuredAddress;
private final boolean includeServerName;
private final List<Supplier<TransportAddress>> addresses;
private final Supplier<TransportAddress> address;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
private final ConnectionManager.ConnectionValidator clusterNameValidator;
Expand All @@ -114,28 +109,26 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), false);
int maxNumConnections, String configuredAddress) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), false);
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, boolean includeServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses,
configuredAddresses.stream().map(address ->
(Supplier<TransportAddress>) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName);
int maxNumConnections, String configuredAddress, boolean includeServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), includeServerName);
}

ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses,
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
boolean includeServerName) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
this.configuredAddress = configuredAddress;
this.includeServerName = includeServerName;
assert addresses.isEmpty() == false : "Cannot use proxy connection strategy with no configured addresses";
this.addresses = addresses;
assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses";
this.address = address;
// TODO: Move into the ConnectionManager
this.profile = new ConnectionProfile.Builder()
.addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
Expand Down Expand Up @@ -172,9 +165,9 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false;
}

@Override
Expand All @@ -189,7 +182,7 @@ protected void connectImpl(ActionListener<Void> listener) {

@Override
public RemoteConnectionInfo.ModeInfo getModeInfo() {
return new ProxyModeInfo(configuredAddresses, maxNumConnections, connectionManager.size());
return new ProxyModeInfo(configuredAddress, maxNumConnections, connectionManager.size());
}

private void performProxyConnectionProcess(ActionListener<Void> listener) {
Expand All @@ -198,7 +191,7 @@ private void performProxyConnectionProcess(ActionListener<Void> listener) {

private void openConnections(ActionListener<Void> finished, int attemptNumber) {
if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) {
List<TransportAddress> resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList());
TransportAddress resolved = address.get();

int remaining = maxNumConnections - connectionManager.size();
ActionListener<Void> compositeListener = new ActionListener<>() {
Expand Down Expand Up @@ -228,15 +221,14 @@ public void onFailure(Exception e) {


for (int i = 0; i < remaining; ++i) {
TransportAddress address = nextAddress(resolved);
String id = clusterAlias + "#" + address;
String id = clusterAlias + "#" + resolved;
Map<String, String> attributes;
if (includeServerName) {
attributes = Collections.singletonMap("server_name", address.address().getHostString());
attributes = Collections.singletonMap("server_name", resolved.address().getHostString());
} else {
attributes = Collections.emptyMap();
}
DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT.minimumCompatibilityVersion());

connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() {
Expand All @@ -248,7 +240,7 @@ public void onResponse(Void v) {
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]",
clusterAlias, address), e);
clusterAlias, resolved), e);
compositeListener.onFailure(e);
}
});
Expand Down Expand Up @@ -276,48 +268,35 @@ private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseConfiguredAddress(address));
}

private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
if (oldAddresses.size() != newAddresses.size()) {
return true;
}
Set<String> oldSeeds = new HashSet<>(oldAddresses);
Set<String> newSeeds = new HashSet<>(newAddresses);
return oldSeeds.equals(newSeeds) == false;
}

static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {

private final List<String> addresses;
private final String address;
private final int maxSocketConnections;
private final int numSocketsConnected;

ProxyModeInfo(List<String> addresses, int maxSocketConnections, int numSocketsConnected) {
this.addresses = addresses;
ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
this.address = address;
this.maxSocketConnections = maxSocketConnections;
this.numSocketsConnected = numSocketsConnected;
}

private ProxyModeInfo(StreamInput input) throws IOException {
addresses = Arrays.asList(input.readStringArray());
address = input.readString();
maxSocketConnections = input.readVInt();
numSocketsConnected = input.readVInt();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray("addresses");
for (String address : addresses) {
builder.value(address);
}
builder.endArray();
builder.field("address", address);
builder.field("num_sockets_connected", numSocketsConnected);
builder.field("max_socket_connections", maxSocketConnections);
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(addresses.toArray(new String[0]));
out.writeString(address);
out.writeVInt(maxSocketConnections);
out.writeVInt(numSocketsConnected);
}
Expand All @@ -344,12 +323,12 @@ public boolean equals(Object o) {
ProxyModeInfo otherProxy = (ProxyModeInfo) o;
return maxSocketConnections == otherProxy.maxSocketConnections &&
numSocketsConnected == otherProxy.numSocketsConnected &&
Objects.equals(addresses, otherProxy.addresses);
Objects.equals(address, otherProxy.address);
}

@Override
public int hashCode() {
return Objects.hash(addresses, maxSocketConnections, numSocketsConnected);
return Objects.hash(address, maxSocketConnections, numSocketsConnected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -159,9 +160,8 @@ public static boolean isConnectionEnabled(String clusterAlias, Settings settings
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
return seeds.isEmpty() == false;
} else {
List<String> addresses = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias)
.get(settings);
return addresses.isEmpty() == false;
String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings);
return Strings.isEmpty(address) == false;
}
}

Expand Down
Loading