Skip to content

Commit 93c2c60

Browse files
authored
Move compression config to ConnectionProfile (#35357)
This is related to #34483. It introduces a namespaced setting for compression that allows users to configure compression on a per remote cluster basis. The transport.tcp.compress remains as a fallback setting. If transport.tcp.compress is set to true, then all requests and responses are compressed. If it is set to false, only requests to clusters based on the cluster.remote.cluster_name.transport.compress setting are compressed. However, after this change regardless of any local settings, responses will be compressed if the request that is received was compressed.
1 parent 5c2a5f2 commit 93c2c60

File tree

29 files changed

+372
-303
lines changed

29 files changed

+372
-303
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,8 @@ private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
169169
private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) {
170170
String name = profileSettings.profileName;
171171
if (logger.isDebugEnabled()) {
172-
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
173-
+ "receive_predictor[{}->{}]",
174-
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
172+
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
173+
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts,
175174
receivePredictorMin, receivePredictorMax);
176175
}
177176

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.transport.TransportRequestOptions;
3939
import org.elasticsearch.transport.TransportResponse;
4040
import org.elasticsearch.transport.TransportResponseHandler;
41-
import org.elasticsearch.transport.TransportResponseOptions;
4241
import org.elasticsearch.transport.TransportService;
4342

4443
import java.io.IOException;
@@ -91,7 +90,7 @@ public void testScheduledPing() throws Exception {
9190
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
9291
(request, channel, task) -> {
9392
try {
94-
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
93+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
9594
} catch (IOException e) {
9695
logger.error("Unexpected failure", e);
9796
fail(e.getMessage());

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.settings.ClusterSettings;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.transport.TransportAddress;
30-
import org.elasticsearch.common.unit.TimeValue;
3130
import org.elasticsearch.common.util.BigArrays;
3231
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3332
import org.elasticsearch.node.Node;
@@ -36,6 +35,7 @@
3635
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3736
import org.elasticsearch.transport.BindTransportException;
3837
import org.elasticsearch.transport.ConnectTransportException;
38+
import org.elasticsearch.transport.ConnectionProfile;
3939
import org.elasticsearch.transport.TcpChannel;
4040
import org.elasticsearch.transport.TcpTransport;
4141
import org.elasticsearch.transport.Transport;
@@ -58,9 +58,10 @@ public static MockTransportService nettyFromThreadPool(Settings settings, Thread
5858
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
5959

6060
@Override
61-
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
61+
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
62+
ActionListener<Version> listener) {
6263
if (doHandshake) {
63-
super.executeHandshake(node, channel, timeout, listener);
64+
super.executeHandshake(node, channel, profile, listener);
6465
} else {
6566
listener.onResponse(version.minimumCompatibilityVersion());
6667
}

plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.settings.ClusterSettings;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.transport.TransportAddress;
30-
import org.elasticsearch.common.unit.TimeValue;
3130
import org.elasticsearch.common.util.BigArrays;
3231
import org.elasticsearch.common.util.MockPageCacheRecycler;
3332
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@@ -37,6 +36,7 @@
3736
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3837
import org.elasticsearch.transport.BindTransportException;
3938
import org.elasticsearch.transport.ConnectTransportException;
39+
import org.elasticsearch.transport.ConnectionProfile;
4040
import org.elasticsearch.transport.TcpChannel;
4141
import org.elasticsearch.transport.TcpTransport;
4242
import org.elasticsearch.transport.Transport;
@@ -62,9 +62,10 @@ public static MockTransportService nioFromThreadPool(Settings settings, ThreadPo
6262
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
6363

6464
@Override
65-
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
65+
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
66+
ActionListener<Version> listener) {
6667
if (doHandshake) {
67-
super.executeHandshake(node, channel, timeout, listener);
68+
super.executeHandshake(node, channel, profile, listener);
6869
} else {
6970
listener.onResponse(version.minimumCompatibilityVersion());
7071
}

server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

Lines changed: 92 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,48 +38,15 @@
3838
*/
3939
public final class ConnectionProfile {
4040

41-
/**
42-
* Builds a connection profile that is dedicated to a single channel type. Use this
43-
* when opening single use connections
44-
*/
45-
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType,
46-
@Nullable TimeValue connectTimeout,
47-
@Nullable TimeValue handshakeTimeout) {
48-
Builder builder = new Builder();
49-
builder.addConnections(1, channelType);
50-
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
51-
otherTypes.remove(channelType);
52-
builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new));
53-
if (connectTimeout != null) {
54-
builder.setConnectTimeout(connectTimeout);
55-
}
56-
if (handshakeTimeout != null) {
57-
builder.setHandshakeTimeout(handshakeTimeout);
58-
}
59-
return builder.build();
60-
}
61-
62-
private final List<ConnectionTypeHandle> handles;
63-
private final int numConnections;
64-
private final TimeValue connectTimeout;
65-
private final TimeValue handshakeTimeout;
66-
67-
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
68-
TimeValue handshakeTimeout) {
69-
this.handles = handles;
70-
this.numConnections = numConnections;
71-
this.connectTimeout = connectTimeout;
72-
this.handshakeTimeout = handshakeTimeout;
73-
}
74-
7541
/**
7642
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
7743
*/
7844
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
7945
Objects.requireNonNull(fallbackProfile);
8046
if (profile == null) {
8147
return fallbackProfile;
82-
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
48+
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null
49+
&& profile.getCompressionEnabled() != null) {
8350
return profile;
8451
} else {
8552
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
@@ -89,6 +56,9 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro
8956
if (profile.getHandshakeTimeout() == null) {
9057
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
9158
}
59+
if (profile.getCompressionEnabled() == null) {
60+
builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled());
61+
}
9262
return builder.build();
9363
}
9464
}
@@ -108,6 +78,7 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
10878
Builder builder = new Builder();
10979
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
11080
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
81+
builder.setCompressionEnabled(Transport.TRANSPORT_TCP_COMPRESS.get(settings));
11182
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
11283
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
11384
// if we are not master eligible we don't need a dedicated channel to publish the state
@@ -118,13 +89,77 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
11889
return builder.build();
11990
}
12091

92+
/**
93+
* Builds a connection profile that is dedicated to a single channel type. Use this
94+
* when opening single use connections
95+
*/
96+
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) {
97+
return buildSingleChannelProfile(channelType, null, null, null);
98+
}
99+
100+
/**
101+
* Builds a connection profile that is dedicated to a single channel type. Allows passing compression
102+
* settings.
103+
*/
104+
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, boolean compressionEnabled) {
105+
return buildSingleChannelProfile(channelType, null, null, compressionEnabled);
106+
}
107+
108+
/**
109+
* Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
110+
* handshake timeouts.
111+
*/
112+
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
113+
@Nullable TimeValue handshakeTimeout) {
114+
return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null);
115+
}
116+
117+
/**
118+
* Builds a connection profile that is dedicated to a single channel type. Allows passing connection and
119+
* handshake timeouts and compression settings.
120+
*/
121+
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
122+
@Nullable TimeValue handshakeTimeout, @Nullable Boolean compressionEnabled) {
123+
Builder builder = new Builder();
124+
builder.addConnections(1, channelType);
125+
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
126+
otherTypes.remove(channelType);
127+
builder.addConnections(0, otherTypes.toArray(new TransportRequestOptions.Type[0]));
128+
if (connectTimeout != null) {
129+
builder.setConnectTimeout(connectTimeout);
130+
}
131+
if (handshakeTimeout != null) {
132+
builder.setHandshakeTimeout(handshakeTimeout);
133+
}
134+
if (compressionEnabled != null) {
135+
builder.setCompressionEnabled(compressionEnabled);
136+
}
137+
return builder.build();
138+
}
139+
140+
private final List<ConnectionTypeHandle> handles;
141+
private final int numConnections;
142+
private final TimeValue connectTimeout;
143+
private final TimeValue handshakeTimeout;
144+
private final Boolean compressionEnabled;
145+
146+
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
147+
TimeValue handshakeTimeout, Boolean compressionEnabled) {
148+
this.handles = handles;
149+
this.numConnections = numConnections;
150+
this.connectTimeout = connectTimeout;
151+
this.handshakeTimeout = handshakeTimeout;
152+
this.compressionEnabled = compressionEnabled;
153+
}
154+
121155
/**
122156
* A builder to build a new {@link ConnectionProfile}
123157
*/
124158
public static class Builder {
125159
private final List<ConnectionTypeHandle> handles = new ArrayList<>();
126160
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
127-
private int offset = 0;
161+
private int numConnections = 0;
162+
private Boolean compressionEnabled;
128163
private TimeValue connectTimeout;
129164
private TimeValue handshakeTimeout;
130165

@@ -135,10 +170,11 @@ public Builder() {
135170
/** copy constructor, using another profile as a base */
136171
public Builder(ConnectionProfile source) {
137172
handles.addAll(source.getHandles());
138-
offset = source.getNumConnections();
173+
numConnections = source.getNumConnections();
139174
handles.forEach(th -> addedTypes.addAll(th.types));
140175
connectTimeout = source.getConnectTimeout();
141176
handshakeTimeout = source.getHandshakeTimeout();
177+
compressionEnabled = source.getCompressionEnabled();
142178
}
143179
/**
144180
* Sets a connect timeout for this connection profile
@@ -160,6 +196,13 @@ public void setHandshakeTimeout(TimeValue handshakeTimeout) {
160196
this.handshakeTimeout = handshakeTimeout;
161197
}
162198

199+
/**
200+
* Sets compression enabled for this connection profile
201+
*/
202+
public void setCompressionEnabled(boolean compressionEnabled) {
203+
this.compressionEnabled = compressionEnabled;
204+
}
205+
163206
/**
164207
* Adds a number of connections for one or more types. Each type can only be added once.
165208
* @param numConnections the number of connections to use in the pool for the given connection types
@@ -175,8 +218,8 @@ public void addConnections(int numConnections, TransportRequestOptions.Type... t
175218
}
176219
}
177220
addedTypes.addAll(Arrays.asList(types));
178-
handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types))));
179-
offset += numConnections;
221+
handles.add(new ConnectionTypeHandle(this.numConnections, numConnections, EnumSet.copyOf(Arrays.asList(types))));
222+
this.numConnections += numConnections;
180223
}
181224

182225
/**
@@ -189,7 +232,8 @@ public ConnectionProfile build() {
189232
if (types.isEmpty() == false) {
190233
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
191234
}
192-
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout);
235+
return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout,
236+
compressionEnabled);
193237
}
194238

195239
}
@@ -208,6 +252,14 @@ public TimeValue getHandshakeTimeout() {
208252
return handshakeTimeout;
209253
}
210254

255+
/**
256+
* Returns boolean indicating if compression is enabled or <code>null</code> if no explicit compression
257+
* is set on this profile.
258+
*/
259+
public Boolean getCompressionEnabled() {
260+
return compressionEnabled;
261+
}
262+
211263
/**
212264
* Returns the total number of connections for this profile
213265
*/

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import java.util.function.Supplier;
6767
import java.util.stream.Collectors;
6868

69+
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
70+
6971
/**
7072
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
7173
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
@@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8688
private final ConnectionProfile remoteProfile;
8789
private final ConnectedNodes connectedNodes;
8890
private final String clusterAlias;
91+
private final boolean compress;
8992
private final int maxNumRemoteConnections;
9093
private final Predicate<DiscoveryNode> nodePredicate;
9194
private final ThreadPool threadPool;
@@ -108,12 +111,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
108111
* @param proxyAddress the proxy address
109112
*/
110113
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
111-
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
112-
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
114+
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
115+
Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
113116
this.transportService = transportService;
114117
this.maxNumRemoteConnections = maxNumRemoteConnections;
115118
this.nodePredicate = nodePredicate;
116119
this.clusterAlias = clusterAlias;
120+
this.compress = REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
117121
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
118122
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
119123
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
@@ -122,6 +126,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
122126
TransportRequestOptions.Type.BULK,
123127
TransportRequestOptions.Type.STATE,
124128
TransportRequestOptions.Type.RECOVERY);
129+
builder.setCompressionEnabled(compress);
125130
remoteProfile = builder.build();
126131
connectedNodes = new ConnectedNodes(clusterAlias);
127132
this.seedNodes = Collections.unmodifiableList(seedNodes);
@@ -471,8 +476,8 @@ protected void doRun() {
471476
});
472477
}
473478

474-
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
475-
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
479+
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, final TransportService transportService,
480+
final ConnectionManager manager, ActionListener<Void> listener) {
476481
if (Thread.currentThread().isInterrupted()) {
477482
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
478483
}
@@ -483,8 +488,9 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
483488
logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode,
484489
proxyAddress);
485490
final TransportService.HandshakeResponse handshakeResponse;
486-
Transport.Connection connection = manager.openConnection(seedNode,
487-
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
491+
ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG,
492+
compress);
493+
Transport.Connection connection = manager.openConnection(seedNode, profile);
488494
boolean success = false;
489495
try {
490496
try {

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public String getKey(final String key) {
173173
key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope),
174174
REMOTE_CLUSTERS_SEEDS);
175175

176+
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
177+
"cluster.remote.",
178+
"transport.compress",
179+
key -> boolSetting(key, Transport.TRANSPORT_TCP_COMPRESS, Setting.Property.NodeScope),
180+
REMOTE_CLUSTERS_SEEDS);
181+
176182
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
177183
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
178184

0 commit comments

Comments
 (0)