Skip to content

Commit cc1244f

Browse files
committed
Remove implementations of TransportChannel (#27388)
Right now we have unnecessary implementations of `TransportChannel`. Additionally, there are methods on the interface that are not used. This commit removes unnecessary implementations and methods.
1 parent c529476 commit cc1244f

File tree

9 files changed

+34
-137
lines changed

9 files changed

+34
-137
lines changed

core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.tasks.TaskManager;
2424

2525
import java.io.IOException;
26-
import java.util.function.Supplier;
2726

2827
public class RequestHandlerRegistry<Request extends TransportRequest> {
2928

@@ -63,7 +62,7 @@ public void processMessageReceived(Request request, TransportChannel channel) th
6362
} else {
6463
boolean success = false;
6564
try {
66-
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
65+
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
6766
success = true;
6867
} finally {
6968
if (success == false) {
@@ -90,38 +89,4 @@ public String toString() {
9089
return handler.toString();
9190
}
9291

93-
private static class TransportChannelWrapper extends DelegatingTransportChannel {
94-
95-
private final Task task;
96-
97-
private final TaskManager taskManager;
98-
99-
TransportChannelWrapper(TaskManager taskManager, Task task, TransportChannel channel) {
100-
super(channel);
101-
this.task = task;
102-
this.taskManager = taskManager;
103-
}
104-
105-
@Override
106-
public void sendResponse(TransportResponse response) throws IOException {
107-
endTask();
108-
super.sendResponse(response);
109-
}
110-
111-
@Override
112-
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
113-
endTask();
114-
super.sendResponse(response, options);
115-
}
116-
117-
@Override
118-
public void sendResponse(Exception exception) throws IOException {
119-
endTask();
120-
super.sendResponse(exception);
121-
}
122-
123-
private void endTask() {
124-
taskManager.unregister(task);
125-
}
126-
}
12792
}

core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java renamed to core/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,62 +20,62 @@
2020
package org.elasticsearch.transport;
2121

2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.tasks.TaskManager;
2325

2426
import java.io.IOException;
2527

26-
/**
27-
* Wrapper around transport channel that delegates all requests to the
28-
* underlying channel
29-
*/
30-
public class DelegatingTransportChannel implements TransportChannel {
28+
public class TaskTransportChannel implements TransportChannel {
29+
30+
private final Task task;
3131

32+
private final TaskManager taskManager;
3233
private final TransportChannel channel;
3334

34-
protected DelegatingTransportChannel(TransportChannel channel) {
35+
TaskTransportChannel(TaskManager taskManager, Task task, TransportChannel channel) {
3536
this.channel = channel;
36-
}
37-
38-
@Override
39-
public String action() {
40-
return channel.action();
37+
this.task = task;
38+
this.taskManager = taskManager;
4139
}
4240

4341
@Override
4442
public String getProfileName() {
4543
return channel.getProfileName();
4644
}
4745

48-
@Override
49-
public long getRequestId() {
50-
return channel.getRequestId();
51-
}
52-
5346
@Override
5447
public String getChannelType() {
5548
return channel.getChannelType();
5649
}
5750

5851
@Override
5952
public void sendResponse(TransportResponse response) throws IOException {
53+
endTask();
6054
channel.sendResponse(response);
6155
}
6256

6357
@Override
6458
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
59+
endTask();
6560
channel.sendResponse(response, options);
6661
}
6762

6863
@Override
6964
public void sendResponse(Exception exception) throws IOException {
65+
endTask();
7066
channel.sendResponse(exception);
7167
}
7268

69+
@Override
70+
public Version getVersion() {
71+
return channel.getVersion();
72+
}
73+
7374
public TransportChannel getChannel() {
7475
return channel;
7576
}
7677

77-
@Override
78-
public Version getVersion() {
79-
return channel.getVersion();
78+
private void endTask() {
79+
taskManager.unregister(task);
8080
}
8181
}

core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,19 @@
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525

2626
public final class TcpTransportChannel<Channel> implements TransportChannel {
27+
2728
private final TcpTransport<Channel> transport;
28-
protected final Version version;
29-
protected final String action;
30-
protected final long requestId;
29+
private final Version version;
30+
private final String action;
31+
private final long requestId;
3132
private final String profileName;
3233
private final long reservedBytes;
3334
private final AtomicBoolean released = new AtomicBoolean();
3435
private final String channelType;
3536
private final Channel channel;
3637

37-
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
38-
long requestId, Version version, String profileName, long reservedBytes) {
38+
TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
39+
long requestId, Version version, String profileName, long reservedBytes) {
3940
this.version = version;
4041
this.channel = channel;
4142
this.transport = transport;
@@ -51,11 +52,6 @@ public String getProfileName() {
5152
return profileName;
5253
}
5354

54-
@Override
55-
public String action() {
56-
return this.action;
57-
}
58-
5955
@Override
6056
public void sendResponse(TransportResponse response) throws IOException {
6157
sendResponse(response, TransportResponseOptions.EMPTY);
@@ -78,6 +74,7 @@ public void sendResponse(Exception exception) throws IOException {
7874
release(true);
7975
}
8076
}
77+
8178
private Exception releaseBy;
8279

8380
private void release(boolean isExceptionResponse) {
@@ -91,23 +88,18 @@ private void release(boolean isExceptionResponse) {
9188
}
9289
}
9390

94-
@Override
95-
public long getRequestId() {
96-
return requestId;
97-
}
98-
9991
@Override
10092
public String getChannelType() {
10193
return channelType;
10294
}
10395

104-
public Channel getChannel() {
105-
return channel;
106-
}
107-
10896
@Override
10997
public Version getVersion() {
11098
return version;
11199
}
100+
101+
public Channel getChannel() {
102+
return channel;
103+
}
112104
}
113105

core/src/main/java/org/elasticsearch/transport/TransportChannel.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,8 @@
2828
*/
2929
public interface TransportChannel {
3030

31-
String action();
32-
3331
String getProfileName();
3432

35-
long getRequestId();
36-
3733
String getChannelType();
3834

3935
void sendResponse(TransportResponse response) throws IOException;

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,8 +1077,8 @@ static class DirectResponseChannel implements TransportChannel {
10771077
final TransportService service;
10781078
final ThreadPool threadPool;
10791079

1080-
DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId,
1081-
TransportService service, ThreadPool threadPool) {
1080+
DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, TransportService service,
1081+
ThreadPool threadPool) {
10821082
this.logger = logger;
10831083
this.localNode = localNode;
10841084
this.action = action;
@@ -1087,11 +1087,6 @@ static class DirectResponseChannel implements TransportChannel {
10871087
this.threadPool = threadPool;
10881088
}
10891089

1090-
@Override
1091-
public String action() {
1092-
return action;
1093-
}
1094-
10951090
@Override
10961091
public String getProfileName() {
10971092
return DIRECT_RESPONSE_PROFILE;
@@ -1137,13 +1132,7 @@ public void sendResponse(Exception exception) throws IOException {
11371132
if (ThreadPool.Names.SAME.equals(executor)) {
11381133
processException(handler, rtx);
11391134
} else {
1140-
threadPool.executor(handler.executor()).execute(new Runnable() {
1141-
@SuppressWarnings({"unchecked"})
1142-
@Override
1143-
public void run() {
1144-
processException(handler, rtx);
1145-
}
1146-
});
1135+
threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
11471136
}
11481137
}
11491138
}
@@ -1165,11 +1154,6 @@ protected void processException(final TransportResponseHandler handler, final Re
11651154
}
11661155
}
11671156

1168-
@Override
1169-
public long getRequestId() {
1170-
return requestId;
1171-
}
1172-
11731157
@Override
11741158
public String getChannelType() {
11751159
return "direct";

core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -471,11 +471,6 @@ public TransportResponse getCapturedResponse() {
471471
return capturedResponse;
472472
}
473473

474-
@Override
475-
public String action() {
476-
return null;
477-
}
478-
479474
@Override
480475
public String getProfileName() {
481476
return "";
@@ -494,11 +489,6 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op
494489
public void sendResponse(Exception exception) throws IOException {
495490
}
496491

497-
@Override
498-
public long getRequestId() {
499-
return 0;
500-
}
501-
502492
@Override
503493
public String getChannelType() {
504494
return "test";

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,11 +1237,6 @@ public void execute() throws Exception {
12371237
public TransportChannel createTransportChannel(final PlainActionFuture<TestResponse> listener) {
12381238
return new TransportChannel() {
12391239

1240-
@Override
1241-
public String action() {
1242-
return null;
1243-
}
1244-
12451240
@Override
12461241
public String getProfileName() {
12471242
return "";
@@ -1262,11 +1257,6 @@ public void sendResponse(Exception exception) throws IOException {
12621257
listener.onFailure(exception);
12631258
}
12641259

1265-
@Override
1266-
public long getRequestId() {
1267-
return 0;
1268-
}
1269-
12701260
@Override
12711261
public String getChannelType() {
12721262
return "replica_test";

core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -914,11 +914,6 @@ public void clear() {
914914
error.set(null);
915915
}
916916

917-
@Override
918-
public String action() {
919-
return "_noop_";
920-
}
921-
922917
@Override
923918
public String getProfileName() {
924919
return "_noop_";
@@ -942,11 +937,6 @@ public void sendResponse(Exception exception) throws IOException {
942937
assertThat(response.get(), nullValue());
943938
}
944939

945-
@Override
946-
public long getRequestId() {
947-
return 0;
948-
}
949-
950940
@Override
951941
public String getChannelType() {
952942
return "capturing";

core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.discovery.Discovery;
4848
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
4949
import org.elasticsearch.index.shard.ShardId;
50-
import org.elasticsearch.plugins.ClusterPlugin;
5150
import org.elasticsearch.test.ClusterServiceUtils;
5251
import org.elasticsearch.test.ESTestCase;
5352
import org.elasticsearch.test.VersionUtils;
@@ -378,21 +377,12 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
378377
} else {
379378
AtomicBoolean sendResponse = new AtomicBoolean(false);
380379
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
381-
@Override
382-
public String action() {
383-
return null;
384-
}
385380

386381
@Override
387382
public String getProfileName() {
388383
return null;
389384
}
390385

391-
@Override
392-
public long getRequestId() {
393-
return 0;
394-
}
395-
396386
@Override
397387
public String getChannelType() {
398388
return null;

0 commit comments

Comments
 (0)