Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) {
String name = profileSettings.profileName;
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts,
receivePredictorMin, receivePredictorMax);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -91,7 +90,7 @@ public void testScheduledPing() throws Exception {
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
Expand All @@ -36,6 +35,7 @@
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
Expand All @@ -58,9 +58,10 @@ public static MockTransportService nettyFromThreadPool(Settings settings, Thread
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
ActionListener<Version> listener) {
if (doHandshake) {
super.executeHandshake(node, channel, timeout, listener);
super.executeHandshake(node, channel, profile, listener);
} else {
listener.onResponse(version.minimumCompatibilityVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand All @@ -37,6 +36,7 @@
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
Expand All @@ -62,9 +62,10 @@ public static MockTransportService nioFromThreadPool(Settings settings, ThreadPo
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
ActionListener<Version> listener) {
if (doHandshake) {
super.executeHandshake(node, channel, timeout, listener);
super.executeHandshake(node, channel, profile, listener);
} else {
listener.onResponse(version.minimumCompatibilityVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,15 @@
*/
public final class ConnectionProfile {

/**
* Builds a connection profile that is dedicated to a single channel type. Use this
* when opening single use connections
*/
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType,
@Nullable TimeValue connectTimeout,
@Nullable TimeValue handshakeTimeout) {
Builder builder = new Builder();
builder.addConnections(1, channelType);
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
otherTypes.remove(channelType);
builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new));
if (connectTimeout != null) {
builder.setConnectTimeout(connectTimeout);
}
if (handshakeTimeout != null) {
builder.setHandshakeTimeout(handshakeTimeout);
}
return builder.build();
}

private final List<ConnectionTypeHandle> handles;
private final int numConnections;
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
TimeValue handshakeTimeout) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
}

/**
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
*/
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
Objects.requireNonNull(fallbackProfile);
if (profile == null) {
return fallbackProfile;
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null
&& profile.getCompressionEnabled() != null) {
return profile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
Expand All @@ -89,6 +56,9 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro
if (profile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
}
if (profile.getCompressionEnabled() == null) {
builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled());
}
return builder.build();
}
}
Expand All @@ -108,6 +78,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
Builder builder = new Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setCompressionEnabled(Transport.TRANSPORT_TCP_COMPRESS.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
Expand All @@ -118,13 +89,77 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
return builder.build();
}

/**
* Builds a connection profile that is dedicated to a single channel type. Use this
* when opening single use connections
*/
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) {
return buildSingleChannelProfile(channelType, null, null, null);
}

/**
* Builds a connection profile that is dedicated to a single channel type. Allows passing compression
* settings.
*/
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, boolean compressionEnabled) {
return buildSingleChannelProfile(channelType, null, null, compressionEnabled);
}

/**
* Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
* handshake timeouts.
*/
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
@Nullable TimeValue handshakeTimeout) {
return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null);
}

/**
* Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
* handshake timeouts and compression settings.
*/
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
@Nullable TimeValue handshakeTimeout, @Nullable Boolean compressionEnabled) {
Builder builder = new Builder();
builder.addConnections(1, channelType);
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
otherTypes.remove(channelType);
builder.addConnections(0, otherTypes.toArray(new TransportRequestOptions.Type[0]));
if (connectTimeout != null) {
builder.setConnectTimeout(connectTimeout);
}
if (handshakeTimeout != null) {
builder.setHandshakeTimeout(handshakeTimeout);
}
if (compressionEnabled != null) {
builder.setCompressionEnabled(compressionEnabled);
}
return builder.build();
}

private final List<ConnectionTypeHandle> handles;
private final int numConnections;
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;
private final Boolean compressionEnabled;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
TimeValue handshakeTimeout, Boolean compressionEnabled) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
this.compressionEnabled = compressionEnabled;
}

/**
* A builder to build a new {@link ConnectionProfile}
*/
public static class Builder {
private final List<ConnectionTypeHandle> handles = new ArrayList<>();
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
private int offset = 0;
private int numConnections = 0;
private Boolean compressionEnabled;
private TimeValue connectTimeout;
private TimeValue handshakeTimeout;

Expand All @@ -135,10 +170,11 @@ public Builder() {
/** copy constructor, using another profile as a base */
public Builder(ConnectionProfile source) {
handles.addAll(source.getHandles());
offset = source.getNumConnections();
numConnections = source.getNumConnections();
handles.forEach(th -> addedTypes.addAll(th.types));
connectTimeout = source.getConnectTimeout();
handshakeTimeout = source.getHandshakeTimeout();
compressionEnabled = source.getCompressionEnabled();
}
/**
* Sets a connect timeout for this connection profile
Expand All @@ -160,6 +196,13 @@ public void setHandshakeTimeout(TimeValue handshakeTimeout) {
this.handshakeTimeout = handshakeTimeout;
}

/**
* Sets compression enabled for this connection profile
*/
public void setCompressionEnabled(boolean compressionEnabled) {
this.compressionEnabled = compressionEnabled;
}

/**
* Adds a number of connections for one or more types. Each type can only be added once.
* @param numConnections the number of connections to use in the pool for the given connection types
Expand All @@ -175,8 +218,8 @@ public void addConnections(int numConnections, TransportRequestOptions.Type... t
}
}
addedTypes.addAll(Arrays.asList(types));
handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types))));
offset += numConnections;
handles.add(new ConnectionTypeHandle(this.numConnections, numConnections, EnumSet.copyOf(Arrays.asList(types))));
this.numConnections += numConnections;
}

/**
Expand All @@ -189,7 +232,8 @@ public ConnectionProfile build() {
if (types.isEmpty() == false) {
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
}
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout);
return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout,
compressionEnabled);
}

}
Expand All @@ -208,6 +252,14 @@ public TimeValue getHandshakeTimeout() {
return handshakeTimeout;
}

/**
* Returns boolean indicating if compression is enabled or <code>null</code> if no explicit compression
* is set on this profile.
*/
public Boolean getCompressionEnabled() {
return compressionEnabled;
}

/**
* Returns the total number of connections for this profile
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;

/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
Expand All @@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final ConnectionProfile remoteProfile;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final boolean compress;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
Expand All @@ -108,12 +111,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param proxyAddress the proxy address
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
this.compress = REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
Expand All @@ -122,6 +126,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY);
builder.setCompressionEnabled(compress);
remoteProfile = builder.build();
connectedNodes = new ConnectedNodes(clusterAlias);
this.seedNodes = Collections.unmodifiableList(seedNodes);
Expand Down Expand Up @@ -471,8 +476,8 @@ protected void doRun() {
});
}

private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, final TransportService transportService,
final ConnectionManager manager, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
Expand All @@ -483,8 +488,9 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
proxyAddress);
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG,
compress);
Transport.Connection connection = manager.openConnection(seedNode, profile);
boolean success = false;
try {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public String getKey(final String key) {
key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
"cluster.remote.",
"transport.compress",
key -> boolSetting(key, Transport.TRANSPORT_TCP_COMPRESS, Setting.Property.NodeScope),
REMOTE_CLUSTERS_SEEDS);

private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());

Expand Down
Loading