From 4fc1960a82d984161471a5c8e6e24d308fc88650 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 20 Aug 2018 08:54:55 -0600 Subject: [PATCH] Pass DiscoveryNode to initiateChannel (#32958) This is related to #32517. This commit passes the DiscoveryNode to the initiateChannel method for different Transport implementation. This will allow additional attributes (besides just the socket address) to be used when opening channels. --- .../org/elasticsearch/transport/netty4/Netty4Transport.java | 4 +++- .../main/java/org/elasticsearch/transport/TcpTransport.java | 6 +++--- .../java/org/elasticsearch/transport/TcpTransportTests.java | 2 +- .../java/org/elasticsearch/transport/MockTcpTransport.java | 4 +++- .../java/org/elasticsearch/transport/nio/NioTransport.java | 5 +++-- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 7cf607fb06d88..4b4cbd3414554 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -40,6 +40,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -228,7 +229,8 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) } @Override - protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener listener) throws IOException { + protected NettyTcpChannel initiateChannel(DiscoveryNode node, ActionListener listener) throws IOException { + InetSocketAddress address = node.getAddress().address(); ChannelFuture channelFuture = bootstrap.connect(address); Channel channel = channelFuture.channel(); if (channel == null) { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0cfeffdadfcab..72e66cc215e56 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -439,7 +439,7 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect try { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); connectionFutures.add(connectFuture); - TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture); + TcpChannel channel = initiateChannel(node, connectFuture); logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel)); channels.add(channel); } catch (Exception e) { @@ -825,12 +825,12 @@ protected void serverAcceptedChannel(TcpChannel channel) { /** * Initiate a single tcp socket channel. * - * @param address address for the initiated connection + * @param node for the initiated connection * @param connectListener listener to be called when connection complete * @return the pending connection * @throws IOException if an I/O exception occurs while opening the channel */ - protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException; + protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException; /** * Called to tear down internal resources diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 720c50e3c3b4e..533a8feb7808c 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -185,7 +185,7 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce } @Override - protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { return new FakeChannel(messageCaptor); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 7df0f041cd663..105961aa3230f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; import org.elasticsearch.cli.SuppressForbidden; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -169,7 +170,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx @Override @SuppressForbidden(reason = "real socket for mocking remote connections") - protected MockChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) throws IOException { + protected MockChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { + InetSocketAddress address = node.getAddress().address(); final MockSocket socket = new MockSocket(); final MockChannel channel = new MockChannel(socket, address, "none"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index da2f9886e593f..b8d3b62dffe07 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -85,8 +86,8 @@ protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) } @Override - protected TcpNioSocketChannel initiateChannel(InetSocketAddress address, ActionListener connectListener) - throws IOException { + protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, ActionListener connectListener) throws IOException { + InetSocketAddress address = node.getAddress().address(); TcpNioSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory); channel.addConnectListener(connectListener); return channel;