diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index e310f3012a9fb..7eb34bcdcd3aa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -39,6 +39,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -222,7 +223,8 @@ protected ChannelHandler getClientChannelInitializer() { static final AttributeKey SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); @Override - protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { + protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener listener) throws IOException { + InetSocketAddress address = node.getAddress().address(); ChannelFuture channelFuture = bootstrap.connect(address); Channel channel = channelFuture.channel(); if (channel == null) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 47229a0df2f6e..129f0ada77d5d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -82,7 +83,8 @@ protected NioTcpServerChannel bind(String name, InetSocketAddress address) throw } @Override - protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { + InetSocketAddress address = node.getAddress().address(); NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory); channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); return channel; diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0b82417cfaa04..111c4c23aa190 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -447,7 +447,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect try { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); connectionFutures.add(connectFuture); - TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture); + TcpChannel channel = initiateChannel(node, connectFuture); logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel)); channels.add(channel); } catch (Exception e) { @@ -856,12 +856,12 @@ protected void serverAcceptedChannel(TcpChannel channel) { /** * Initiate a single tcp socket channel. * - * @param address address for the initiated connection + * @param node for the initiated connection * @param connectListener listener to be called when connection complete * @return the pending connection * @throws IOException if an I/O exception occurs while opening the channel */ - protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException; + protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException; /** * Called to tear down internal resources diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index a3d2e1bbc574e..0b6112eb51c90 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -188,7 +188,7 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce } @Override - protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { return new FakeChannel(messageCaptor); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index e6d80ac24d88e..996508bdb887a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; import org.elasticsearch.cli.SuppressForbidden; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -162,7 +163,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx @Override @SuppressForbidden(reason = "real socket for mocking remote connections") - protected MockChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected MockChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { + InetSocketAddress address = node.getAddress().address(); final MockSocket socket = new MockSocket(); final MockChannel channel = new MockChannel(socket, address, "none"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index fbe61db6ee721..19543cfdcbb15 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -85,7 +86,8 @@ protected MockServerChannel bind(String name, InetSocketAddress address) throws } @Override - protected MockSocketChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected MockSocketChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { + InetSocketAddress address = node.getAddress().address(); MockSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory); channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); return channel;