Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable {
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
Expand Down Expand Up @@ -323,23 +323,4 @@ public void onConnectionClosed(Transport.Connection connection) {
}
}
}

public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
Expand Down Expand Up @@ -91,6 +93,31 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro
}
}

/**
* Builds a default connection profile based on the provided settings.
*
* @param settings to build the connection profile from
* @return the connection profile
*/
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
Builder builder = new Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}

/**
* A builder to build a new {@link ConnectionProfile}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void stopThreadPool() {
}

public void testConnectionProfileResolve() {
final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));

final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Expand Down Expand Up @@ -96,31 +96,31 @@ public void testConnectionProfileResolve() {
}

public void testDefaultConnectionProfile() {
ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(13, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
assertEquals(12, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
assertEquals(11, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));

profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
.put("node.master", false).build());
assertEquals(10, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);

protected int channelsPerNodeConnection() {
return 13;
// This is a customized profile for this test case.
return 6;
}

@Override
Expand All @@ -125,9 +126,17 @@ public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
Settings connectionSettings = Settings.builder()
.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2)
.put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
.put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1)
.build();

serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates
nodeA = serviceA.getLocalNode();
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
nodeB = serviceB.getLocalNode();
// wait till all nodes are properly connected and the event has been sent, so tests in this class
// will not get this callback called on the connections done in this setup
Expand Down Expand Up @@ -174,7 +183,12 @@ private MockTransportService buildService(final String name, final Version versi
}

protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
return buildService(name, version, clusterSettings, Settings.EMPTY);
}

protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings) {
return buildService(name, version, clusterSettings, settings, true, true);
}

@Override
Expand Down Expand Up @@ -1999,7 +2013,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea
assertEquals("handshake failed", exception.getCause().getMessage());
}

ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, Time
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected Version getCurrentVersion() {

};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();

ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected Version getCurrentVersion() {

};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
Collections.emptySet());
mockTransportService.start();
return mockTransportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected Version getCurrentVersion() {

};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
Collections.emptySet());
mockTransportService.start();
return mockTransportService;
Expand Down