Skip to content

Commit ccf7903

Browse files
committed
Transition transport apis to use void listeners (#27440)
Currently we use ActionListener<TcpChannel> for connect, close, and send message listeners in TcpTransport. However, all of the listeners have to capture a reference to a channel in the case of the exception api being called. This commit changes these listeners to be type <Void> as passing the channel to onResponse is not necessary. Additionally, this change makes it easier to integrate with low level transports (which use different implementations of TcpChannel).
1 parent fe4190d commit ccf7903

File tree

18 files changed

+104
-110
lines changed

18 files changed

+104
-110
lines changed

core/src/main/java/org/elasticsearch/transport/TcpChannel.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import java.io.IOException;
3232
import java.net.InetSocketAddress;
33-
import java.net.SocketAddress;
3433
import java.util.ArrayList;
3534
import java.util.Collections;
3635
import java.util.List;
@@ -61,7 +60,7 @@ public interface TcpChannel extends Releasable {
6160
*
6261
* @param listener to be executed
6362
*/
64-
void addCloseListener(ActionListener<TcpChannel> listener);
63+
void addCloseListener(ActionListener<Void> listener);
6564

6665

6766
/**
@@ -94,7 +93,7 @@ public interface TcpChannel extends Releasable {
9493
* @param reference to send to channel
9594
* @param listener to execute upon send completion
9695
*/
97-
void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener);
96+
void sendMessage(BytesReference reference, ActionListener<Void> listener);
9897

9998
/**
10099
* Closes the channel.
@@ -114,10 +113,10 @@ static <C extends TcpChannel> void closeChannel(C channel, boolean blocking) {
114113
*/
115114
static <C extends TcpChannel> void closeChannels(List<C> channels, boolean blocking) {
116115
if (blocking) {
117-
ArrayList<ActionFuture<TcpChannel>> futures = new ArrayList<>(channels.size());
116+
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
118117
for (final C channel : channels) {
119118
if (channel.isOpen()) {
120-
PlainActionFuture<TcpChannel> closeFuture = PlainActionFuture.newFuture();
119+
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
121120
channel.addCloseListener(closeFuture);
122121
channel.close();
123122
futures.add(closeFuture);
@@ -136,15 +135,14 @@ static <C extends TcpChannel> void closeChannels(List<C> channels, boolean block
136135
* @param discoveryNode the node for the pending connections
137136
* @param connectionFutures representing the pending connections
138137
* @param connectTimeout to wait for a connection
139-
* @param <C> the type of channel
140138
* @throws ConnectTransportException if one of the connections fails
141139
*/
142-
static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<C>> connectionFutures,
143-
TimeValue connectTimeout) throws ConnectTransportException {
140+
static void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<Void>> connectionFutures, TimeValue connectTimeout)
141+
throws ConnectTransportException {
144142
Exception connectionException = null;
145143
boolean allConnected = true;
146144

147-
for (ActionFuture<C> connectionFuture : connectionFutures) {
145+
for (ActionFuture<Void> connectionFuture : connectionFutures) {
148146
try {
149147
connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS);
150148
} catch (TimeoutException e) {
@@ -169,8 +167,8 @@ static <C extends TcpChannel> void awaitConnected(DiscoveryNode discoveryNode, L
169167
}
170168
}
171169

172-
static void blockOnFutures(List<ActionFuture<TcpChannel>> futures) {
173-
for (ActionFuture<TcpChannel> future : futures) {
170+
static void blockOnFutures(List<ActionFuture<Void>> futures) {
171+
for (ActionFuture<Void> future : futures) {
174172
try {
175173
future.get();
176174
} catch (ExecutionException e) {

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ protected void doRunInLifecycle() throws Exception {
343343
for (TcpChannel channel : channels.getChannels()) {
344344
internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) {
345345
@Override
346-
protected void innerInnerOnResponse(TcpChannel channel) {
346+
protected void innerInnerOnResponse(Void v) {
347347
successfulPings.inc();
348348
}
349349

@@ -595,10 +595,10 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
595595
int numConnections = connectionProfile.getNumConnections();
596596
assert numConnections > 0 : "A connection profile must be configured with at least one connection";
597597
List<TcpChannel> channels = new ArrayList<>(numConnections);
598-
List<ActionFuture<TcpChannel>> connectionFutures = new ArrayList<>(numConnections);
598+
List<ActionFuture<Void>> connectionFutures = new ArrayList<>(numConnections);
599599
for (int i = 0; i < numConnections; ++i) {
600600
try {
601-
PlainActionFuture<TcpChannel> connectFuture = PlainActionFuture.newFuture();
601+
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
602602
connectionFutures.add(connectFuture);
603603
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
604604
channels.add(channel);
@@ -940,7 +940,7 @@ protected final void doStop() {
940940
for (Map.Entry<String, List<TcpChannel>> entry : serverChannels.entrySet()) {
941941
String profile = entry.getKey();
942942
List<TcpChannel> channels = entry.getValue();
943-
ActionListener<TcpChannel> closeFailLogger = ActionListener.wrap(c -> {},
943+
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {},
944944
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
945945
channels.forEach(c -> c.addCloseListener(closeFailLogger));
946946
TcpChannel.closeChannels(channels, true);
@@ -1016,7 +1016,7 @@ protected void onException(TcpChannel channel, Exception e) {
10161016
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
10171017
final SendMetricListener closeChannel = new SendMetricListener(message.length()) {
10181018
@Override
1019-
protected void innerInnerOnResponse(TcpChannel channel) {
1019+
protected void innerInnerOnResponse(Void v) {
10201020
TcpChannel.closeChannel(channel, false);
10211021
}
10221022

@@ -1060,7 +1060,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
10601060
* @return the pending connection
10611061
* @throws IOException if an I/O exception occurs while opening the channel
10621062
*/
1063-
protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
1063+
protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
10641064
throws IOException;
10651065

10661066
/**
@@ -1687,20 +1687,20 @@ protected final void ensureOpen() {
16871687
/**
16881688
* This listener increments the transmitted bytes metric on success.
16891689
*/
1690-
private abstract class SendMetricListener extends NotifyOnceListener<TcpChannel> {
1690+
private abstract class SendMetricListener extends NotifyOnceListener<Void> {
16911691
private final long messageSize;
16921692

16931693
private SendMetricListener(long messageSize) {
16941694
this.messageSize = messageSize;
16951695
}
16961696

16971697
@Override
1698-
protected final void innerOnResponse(org.elasticsearch.transport.TcpChannel object) {
1698+
protected final void innerOnResponse(Void object) {
16991699
transmittedBytesMetric.inc(messageSize);
17001700
innerInnerOnResponse(object);
17011701
}
17021702

1703-
protected abstract void innerInnerOnResponse(org.elasticsearch.transport.TcpChannel object);
1703+
protected abstract void innerInnerOnResponse(Void object);
17041704
}
17051705

17061706
private final class SendListener extends SendMetricListener {
@@ -1716,7 +1716,7 @@ private SendListener(TcpChannel channel, Releasable optionalReleasable, Runnable
17161716
}
17171717

17181718
@Override
1719-
protected void innerInnerOnResponse(TcpChannel channel) {
1719+
protected void innerInnerOnResponse(Void v) {
17201720
release();
17211721
}
17221722

core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ protected FakeChannel bind(String name, InetSocketAddress address) throws IOExce
185185
}
186186

187187
@Override
188-
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout,
189-
ActionListener<TcpChannel> connectListener) throws IOException {
188+
protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
189+
throws IOException {
190190
return new FakeChannel(messageCaptor);
191191
}
192192

@@ -251,7 +251,7 @@ public void close() {
251251
}
252252

253253
@Override
254-
public void addCloseListener(ActionListener<TcpChannel> listener) {
254+
public void addCloseListener(ActionListener<Void> listener) {
255255
}
256256

257257
@Override
@@ -269,7 +269,7 @@ public InetSocketAddress getLocalAddress() {
269269
}
270270

271271
@Override
272-
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
272+
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
273273
messageCaptor.set(reference);
274274
}
275275
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public long getNumOpenServerConnections() {
249249
}
250250

251251
@Override
252-
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> listener)
252+
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
253253
throws IOException {
254254
ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address());
255255
Channel channel = channelFuture.channel();
@@ -264,7 +264,7 @@ protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectT
264264

265265
channelFuture.addListener(f -> {
266266
if (f.isSuccess()) {
267-
listener.onResponse(nettyChannel);
267+
listener.onResponse(null);
268268
} else {
269269
Throwable cause = f.cause();
270270
if (cause instanceof Error) {

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@
3434
public class NettyTcpChannel implements TcpChannel {
3535

3636
private final Channel channel;
37-
private final CompletableFuture<TcpChannel> closeContext = new CompletableFuture<>();
37+
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
3838

3939
NettyTcpChannel(Channel channel) {
4040
this.channel = channel;
4141
this.channel.closeFuture().addListener(f -> {
4242
if (f.isSuccess()) {
43-
closeContext.complete(this);
43+
closeContext.complete(null);
4444
} else {
4545
Throwable cause = f.cause();
4646
if (cause instanceof Error) {
@@ -59,7 +59,7 @@ public void close() {
5959
}
6060

6161
@Override
62-
public void addCloseListener(ActionListener<TcpChannel> listener) {
62+
public void addCloseListener(ActionListener<Void> listener) {
6363
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
6464
}
6565

@@ -79,11 +79,11 @@ public InetSocketAddress getLocalAddress() {
7979
}
8080

8181
@Override
82-
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
82+
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
8383
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
8484
future.addListener(f -> {
8585
if (f.isSuccess()) {
86-
listener.onResponse(this);
86+
listener.onResponse(null);
8787
} else {
8888
final Throwable cause = f.cause();
8989
Netty4Utils.maybeDie(cause);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
171171
}
172172

173173
@Override
174-
protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
174+
protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
175175
throws IOException {
176176
InetSocketAddress address = node.getAddress().address();
177177
final MockSocket socket = new MockSocket();
@@ -186,7 +186,7 @@ protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
186186
MockChannel channel = new MockChannel(socket, address, "none", (c) -> {});
187187
channel.loopRead(executor);
188188
success = true;
189-
connectListener.onResponse(channel);
189+
connectListener.onResponse(null);
190190
return channel;
191191
} finally {
192192
if (success == false) {
@@ -231,7 +231,7 @@ public final class MockChannel implements Closeable, TcpChannel {
231231
private final String profile;
232232
private final CancellableThreads cancellableThreads = new CancellableThreads();
233233
private final Closeable onClose;
234-
private final CompletableFuture<TcpChannel> closeFuture = new CompletableFuture<>();
234+
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
235235

236236
/**
237237
* Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
@@ -356,14 +356,14 @@ public String toString() {
356356
public void close() {
357357
try {
358358
close0();
359-
closeFuture.complete(this);
359+
closeFuture.complete(null);
360360
} catch (IOException e) {
361361
closeFuture.completeExceptionally(e);
362362
}
363363
}
364364

365365
@Override
366-
public void addCloseListener(ActionListener<TcpChannel> listener) {
366+
public void addCloseListener(ActionListener<Void> listener) {
367367
closeFuture.whenComplete(ActionListener.toBiConsumer(listener));
368368
}
369369

@@ -386,14 +386,14 @@ public InetSocketAddress getLocalAddress() {
386386
}
387387

388388
@Override
389-
public void sendMessage(BytesReference reference, ActionListener<TcpChannel> listener) {
389+
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
390390
try {
391391
synchronized (this) {
392392
OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream());
393393
reference.writeTo(outputStream);
394394
outputStream.flush();
395395
}
396-
listener.onResponse(this);
396+
listener.onResponse(null);
397397
} catch (IOException e) {
398398
listener.onFailure(e);
399399
onException(this, e);

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.transport.nio;
2121

2222
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.ExceptionsHelper;
2423
import org.elasticsearch.action.ActionListener;
2524
import org.elasticsearch.cluster.node.DiscoveryNode;
2625
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -32,7 +31,6 @@
3231
import org.elasticsearch.common.util.concurrent.EsExecutors;
3332
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3433
import org.elasticsearch.threadpool.ThreadPool;
35-
import org.elasticsearch.transport.TcpChannel;
3634
import org.elasticsearch.transport.TcpTransport;
3735
import org.elasticsearch.transport.Transports;
3836
import org.elasticsearch.transport.nio.channel.ChannelFactory;
@@ -95,22 +93,11 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th
9593
}
9694

9795
@Override
98-
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<TcpChannel> connectListener)
96+
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
9997
throws IOException {
10098
NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
10199
openChannels.clientChannelOpened(channel);
102-
// TODO: Temporary conversion due to types
103-
channel.addConnectListener(new ActionListener<NioChannel>() {
104-
@Override
105-
public void onResponse(NioChannel nioChannel) {
106-
connectListener.onResponse(nioChannel);
107-
}
108-
109-
@Override
110-
public void onFailure(Exception e) {
111-
connectListener.onFailure(e);
112-
}
113-
});
100+
channel.addConnectListener(connectListener);
114101
return channel;
115102
}
116103

test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.ActionListener;
2525
import org.elasticsearch.common.bytes.BytesArray;
2626
import org.elasticsearch.common.bytes.BytesReference;
27-
import org.elasticsearch.transport.nio.channel.NioChannel;
2827
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
2928

3029
import java.io.IOException;
@@ -33,10 +32,10 @@
3332
public class WriteOperation {
3433

3534
private final NioSocketChannel channel;
36-
private final ActionListener<NioChannel> listener;
35+
private final ActionListener<Void> listener;
3736
private final NetworkBytesReference[] references;
3837

39-
public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<NioChannel> listener) {
38+
public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<Void> listener) {
4039
this.channel = channel;
4140
this.listener = listener;
4241
this.references = toArray(bytesReference);
@@ -46,7 +45,7 @@ public NetworkBytesReference[] getByteReferences() {
4645
return references;
4746
}
4847

49-
public ActionListener<NioChannel> getListener() {
48+
public ActionListener<Void> getListener() {
5049
return listener;
5150
}
5251

0 commit comments

Comments
 (0)