Skip to content

Commit 2464b68

Browse files
authored
Move connection profile into connection manager (#32858)
This is related to #31835. It moves the default connection profile into the ConnectionManager class. The will allow us to have different connection managers with different profiles.
1 parent 51cece1 commit 2464b68

File tree

16 files changed

+184
-179
lines changed

16 files changed

+184
-179
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.elasticsearch.indices.breaker.CircuitBreakerService;
5555
import org.elasticsearch.threadpool.ThreadPool;
5656
import org.elasticsearch.transport.TcpTransport;
57-
import org.elasticsearch.transport.TransportRequestOptions;
5857

5958
import java.io.IOException;
6059
import java.net.InetSocketAddress;
@@ -147,7 +146,6 @@ private Bootstrap createBootstrap() {
147146

148147
bootstrap.handler(getClientChannelInitializer());
149148

150-
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
151149
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
152150
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
153151

@@ -175,14 +173,8 @@ private void createServerBootstrap(ProfileSettings profileSettings) {
175173
String name = profileSettings.profileName;
176174
if (logger.isDebugEnabled()) {
177175
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
178-
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
176+
+ "receive_predictor[{}->{}]",
179177
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
180-
defaultConnectionProfile.getConnectTimeout(),
181-
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
182-
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
183-
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
184-
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
185-
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING),
186178
receivePredictorMin, receivePredictorMax);
187179
}
188180

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,13 +297,13 @@ public void apply(Settings value, Settings current, Settings previous) {
297297
TcpTransport.TCP_REUSE_ADDRESS_PROFILE,
298298
TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE,
299299
TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE,
300-
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
301-
TcpTransport.CONNECTIONS_PER_NODE_BULK,
302-
TcpTransport.CONNECTIONS_PER_NODE_REG,
303-
TcpTransport.CONNECTIONS_PER_NODE_STATE,
304-
TcpTransport.CONNECTIONS_PER_NODE_PING,
300+
TransportService.CONNECTIONS_PER_NODE_RECOVERY,
301+
TransportService.CONNECTIONS_PER_NODE_BULK,
302+
TransportService.CONNECTIONS_PER_NODE_REG,
303+
TransportService.CONNECTIONS_PER_NODE_STATE,
304+
TransportService.CONNECTIONS_PER_NODE_PING,
305+
TransportService.TCP_CONNECT_TIMEOUT,
305306
TcpTransport.PING_SCHEDULE,
306-
TcpTransport.TCP_CONNECT_TIMEOUT,
307307
NetworkService.NETWORK_SERVER,
308308
TcpTransport.TCP_NO_DELAY,
309309
TcpTransport.TCP_KEEP_ALIVE,

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,21 @@ public class ConnectionManager implements Closeable {
6060
private final Transport transport;
6161
private final ThreadPool threadPool;
6262
private final TimeValue pingSchedule;
63+
private final ConnectionProfile defaultProfile;
6364
private final Lifecycle lifecycle = new Lifecycle();
6465
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
6566
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
6667

6768
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
69+
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
70+
}
71+
72+
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
6873
this.logger = Loggers.getLogger(getClass(), settings);
6974
this.transport = transport;
7075
this.threadPool = threadPool;
7176
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
77+
this.defaultProfile = defaultProfile;
7278
this.lifecycle.moveToStarted();
7379

7480
if (pingSchedule.millis() > 0) {
@@ -84,13 +90,18 @@ public void removeListener(TransportConnectionListener listener) {
8490
this.connectionListener.listeners.remove(listener);
8591
}
8692

93+
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
94+
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
95+
}
96+
8797
/**
8898
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
8999
* Once a successful is established, it can be validated before being exposed.
90100
*/
91101
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
92102
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
93103
throws ConnectTransportException {
104+
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
94105
if (node == null) {
95106
throw new ConnectTransportException(null, "can't connect to a null node");
96107
}
@@ -104,8 +115,8 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
104115
}
105116
boolean success = false;
106117
try {
107-
connection = transport.openConnection(node, connectionProfile);
108-
connectionValidator.accept(connection, connectionProfile);
118+
connection = transport.openConnection(node, resolvedProfile);
119+
connectionValidator.accept(connection, resolvedProfile);
109120
// we acquire a connection lock, so no way there is an existing connection
110121
connectedNodes.put(node, connection);
111122
if (logger.isDebugEnabled()) {
@@ -279,4 +290,23 @@ public void onNodeConnected(DiscoveryNode node) {
279290
}
280291
}
281292
}
293+
294+
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
295+
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
296+
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
297+
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
298+
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
299+
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
300+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
301+
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
302+
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
303+
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
304+
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
305+
// if we are not master eligible we don't need a dedicated channel to publish the state
306+
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
307+
// if we are not a data-node we don't need any dedicated channels for recovery
308+
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
309+
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
310+
return builder.build();
311+
}
282312
}

server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collections;
2727
import java.util.EnumSet;
2828
import java.util.List;
29+
import java.util.Objects;
2930
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132

@@ -61,14 +62,35 @@ public static ConnectionProfile buildSingleChannelProfile(TransportRequestOption
6162
private final TimeValue connectTimeout;
6263
private final TimeValue handshakeTimeout;
6364

64-
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout)
65-
{
65+
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
66+
TimeValue handshakeTimeout) {
6667
this.handles = handles;
6768
this.numConnections = numConnections;
6869
this.connectTimeout = connectTimeout;
6970
this.handshakeTimeout = handshakeTimeout;
7071
}
7172

73+
/**
74+
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
75+
*/
76+
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
77+
Objects.requireNonNull(fallbackProfile);
78+
if (profile == null) {
79+
return fallbackProfile;
80+
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
81+
return profile;
82+
} else {
83+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
84+
if (profile.getConnectTimeout() == null) {
85+
builder.setConnectTimeout(fallbackProfile.getConnectTimeout());
86+
}
87+
if (profile.getHandshakeTimeout() == null) {
88+
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
89+
}
90+
return builder.build();
91+
}
92+
}
93+
7294
/**
7395
* A builder to build a new {@link ConnectionProfile}
7496
*/

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
108108
this.nodePredicate = nodePredicate;
109109
this.clusterAlias = clusterAlias;
110110
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
111-
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
112-
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
111+
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
112+
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
113113
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
114114
builder.addConnections(0, // we don't want this to be used for anything else but search
115115
TransportRequestOptions.Type.BULK,

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 8 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.action.support.PlainActionFuture;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.common.Booleans;
32-
import org.elasticsearch.common.Nullable;
3332
import org.elasticsearch.common.Strings;
3433
import org.elasticsearch.common.breaker.CircuitBreaker;
3534
import org.elasticsearch.common.bytes.BytesArray;
@@ -135,30 +134,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
135134
// the scheduled internal ping interval setting, defaults to disabled (-1)
136135
public static final Setting<TimeValue> PING_SCHEDULE =
137136
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
138-
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
139-
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
140-
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
141-
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
142-
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
143-
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
144-
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
145-
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
146-
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
147-
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
148-
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
149-
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
150137
public static final Setting<Boolean> TCP_NO_DELAY =
151138
boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
152139
public static final Setting<Boolean> TCP_KEEP_ALIVE =
153140
boolSetting("transport.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
154141
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
155142
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
156143
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
157-
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE,
158-
Setting.Property.NodeScope);
144+
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
159145
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
160-
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE,
161-
Setting.Property.NodeScope);
146+
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);
162147

163148

164149
public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
@@ -213,7 +198,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
213198
protected final boolean compress;
214199
private volatile BoundTransportAddress boundAddress;
215200
private final String transportName;
216-
protected final ConnectionProfile defaultConnectionProfile;
217201

218202
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
219203
private final CounterMetric numHandshakes = new CounterMetric();
@@ -237,7 +221,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
237221
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
238222
this.networkService = networkService;
239223
this.transportName = transportName;
240-
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
241224
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
242225
if (defaultFeatures == null) {
243226
this.features = new String[0];
@@ -261,25 +244,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
261244
}
262245
}
263246

264-
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
265-
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
266-
int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
267-
int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
268-
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
269-
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
270-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
271-
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
272-
builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
273-
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
274-
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
275-
// if we are not master eligible we don't need a dedicated channel to publish the state
276-
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
277-
// if we are not a data-node we don't need any dedicated channels for recovery
278-
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
279-
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
280-
return builder.build();
281-
}
282-
283247
@Override
284248
protected void doStart() {
285249
}
@@ -456,41 +420,21 @@ public void sendRequest(long requestId, String action, TransportRequest request,
456420
}
457421
}
458422

459-
/**
460-
* takes a {@link ConnectionProfile} that have been passed as a parameter to the public methods
461-
* and resolves it to a fully specified (i.e., no nulls) profile
462-
*/
463-
protected static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile connectionProfile,
464-
ConnectionProfile defaultConnectionProfile) {
465-
Objects.requireNonNull(defaultConnectionProfile);
466-
if (connectionProfile == null) {
467-
return defaultConnectionProfile;
468-
} else if (connectionProfile.getConnectTimeout() != null && connectionProfile.getHandshakeTimeout() != null) {
469-
return connectionProfile;
470-
} else {
471-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(connectionProfile);
472-
if (connectionProfile.getConnectTimeout() == null) {
473-
builder.setConnectTimeout(defaultConnectionProfile.getConnectTimeout());
474-
}
475-
if (connectionProfile.getHandshakeTimeout() == null) {
476-
builder.setHandshakeTimeout(defaultConnectionProfile.getHandshakeTimeout());
477-
}
478-
return builder.build();
479-
}
480-
}
481-
482-
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
483-
return resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
423+
// This allows transport implementations to potentially override specific connection profiles. This
424+
// primarily exists for the test implementations.
425+
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
426+
return connectionProfile;
484427
}
485428

486429
@Override
487430
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
431+
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
488432
if (node == null) {
489433
throw new ConnectTransportException(null, "can't open connection to a null node");
490434
}
491435
boolean success = false;
492436
NodeChannels nodeChannels = null;
493-
connectionProfile = resolveConnectionProfile(connectionProfile);
437+
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
494438
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
495439
try {
496440
ensureOpen();

0 commit comments

Comments
 (0)