diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index fc1af1d876ade..91b54ab8f2097 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -25,7 +25,6 @@ import org.elasticsearch.tasks.TaskManager; import java.io.IOException; -import java.util.function.Supplier; public class RequestHandlerRegistry { @@ -64,7 +63,7 @@ public void processMessageReceived(Request request, TransportChannel channel) th } else { boolean success = false; try { - handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task); + handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task); success = true; } finally { if (success == false) { @@ -91,38 +90,4 @@ public String toString() { return handler.toString(); } - private static class TransportChannelWrapper extends DelegatingTransportChannel { - - private final Task task; - - private final TaskManager taskManager; - - TransportChannelWrapper(TaskManager taskManager, Task task, TransportChannel channel) { - super(channel); - this.task = task; - this.taskManager = taskManager; - } - - @Override - public void sendResponse(TransportResponse response) throws IOException { - endTask(); - super.sendResponse(response); - } - - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - endTask(); - super.sendResponse(response, options); - } - - @Override - public void sendResponse(Exception exception) throws IOException { - endTask(); - super.sendResponse(exception); - } - - private void endTask() { - taskManager.unregister(task); - } - } } diff --git a/core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java similarity index 78% rename from core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java rename to core/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java index bc3b4b3c2ab07..53400207eab4b 100644 --- a/core/src/main/java/org/elasticsearch/transport/DelegatingTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java @@ -20,24 +20,22 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; import java.io.IOException; -/** - * Wrapper around transport channel that delegates all requests to the - * underlying channel - */ -public class DelegatingTransportChannel implements TransportChannel { +public class TaskTransportChannel implements TransportChannel { + + private final Task task; + private final TaskManager taskManager; private final TransportChannel channel; - protected DelegatingTransportChannel(TransportChannel channel) { + TaskTransportChannel(TaskManager taskManager, Task task, TransportChannel channel) { this.channel = channel; - } - - @Override - public String action() { - return channel.action(); + this.task = task; + this.taskManager = taskManager; } @Override @@ -45,11 +43,6 @@ public String getProfileName() { return channel.getProfileName(); } - @Override - public long getRequestId() { - return channel.getRequestId(); - } - @Override public String getChannelType() { return channel.getChannelType(); @@ -57,25 +50,32 @@ public String getChannelType() { @Override public void sendResponse(TransportResponse response) throws IOException { + endTask(); channel.sendResponse(response); } @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + endTask(); channel.sendResponse(response, options); } @Override public void sendResponse(Exception exception) throws IOException { + endTask(); channel.sendResponse(exception); } + @Override + public Version getVersion() { + return channel.getVersion(); + } + public TransportChannel getChannel() { return channel; } - @Override - public Version getVersion() { - return channel.getVersion(); + private void endTask() { + taskManager.unregister(task); } } diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index 03632eb951a72..ae9fa22f70e32 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -24,18 +24,19 @@ import java.util.concurrent.atomic.AtomicBoolean; public final class TcpTransportChannel implements TransportChannel { + private final TcpTransport transport; - protected final Version version; - protected final String action; - protected final long requestId; + private final Version version; + private final String action; + private final long requestId; private final String profileName; private final long reservedBytes; private final AtomicBoolean released = new AtomicBoolean(); private final String channelType; private final Channel channel; - public TcpTransportChannel(TcpTransport transport, Channel channel, String channelType, String action, - long requestId, Version version, String profileName, long reservedBytes) { + TcpTransportChannel(TcpTransport transport, Channel channel, String channelType, String action, + long requestId, Version version, String profileName, long reservedBytes) { this.version = version; this.channel = channel; this.transport = transport; @@ -51,11 +52,6 @@ public String getProfileName() { return profileName; } - @Override - public String action() { - return this.action; - } - @Override public void sendResponse(TransportResponse response) throws IOException { sendResponse(response, TransportResponseOptions.EMPTY); @@ -78,6 +74,7 @@ public void sendResponse(Exception exception) throws IOException { release(true); } } + private Exception releaseBy; private void release(boolean isExceptionResponse) { @@ -91,23 +88,18 @@ private void release(boolean isExceptionResponse) { } } - @Override - public long getRequestId() { - return requestId; - } - @Override public String getChannelType() { return channelType; } - public Channel getChannel() { - return channel; - } - @Override public Version getVersion() { return version; } + + public Channel getChannel() { + return channel; + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannel.java b/core/src/main/java/org/elasticsearch/transport/TransportChannel.java index dbf12f297c139..7aeddfc9223af 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -28,12 +28,8 @@ */ public interface TransportChannel { - String action(); - String getProfileName(); - long getRequestId(); - String getChannelType(); void sendResponse(TransportResponse response) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index adf87e3195fda..2f87deb3bd759 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1117,8 +1117,8 @@ static class DirectResponseChannel implements TransportChannel { final TransportService service; final ThreadPool threadPool; - DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, - TransportService service, ThreadPool threadPool) { + DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, TransportService service, + ThreadPool threadPool) { this.logger = logger; this.localNode = localNode; this.action = action; @@ -1127,11 +1127,6 @@ static class DirectResponseChannel implements TransportChannel { this.threadPool = threadPool; } - @Override - public String action() { - return action; - } - @Override public String getProfileName() { return DIRECT_RESPONSE_PROFILE; @@ -1177,13 +1172,7 @@ public void sendResponse(Exception exception) throws IOException { if (ThreadPool.Names.SAME.equals(executor)) { processException(handler, rtx); } else { - threadPool.executor(handler.executor()).execute(new Runnable() { - @SuppressWarnings({"unchecked"}) - @Override - public void run() { - processException(handler, rtx); - } - }); + threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx)); } } } @@ -1205,11 +1194,6 @@ protected void processException(final TransportResponseHandler handler, final Re } } - @Override - public long getRequestId() { - return requestId; - } - @Override public String getChannelType() { return "direct"; diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 7e04e99b1742e..54253e9620745 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -471,11 +471,6 @@ public TransportResponse getCapturedResponse() { return capturedResponse; } - @Override - public String action() { - return null; - } - @Override public String getProfileName() { return ""; @@ -494,11 +489,6 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op public void sendResponse(Exception exception) throws IOException { } - @Override - public long getRequestId() { - return 0; - } - @Override public String getChannelType() { return "test"; diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 1127a5ced580d..d2472da34f56c 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1237,11 +1237,6 @@ public void execute() throws Exception { public TransportChannel createTransportChannel(final PlainActionFuture listener) { return new TransportChannel() { - @Override - public String action() { - return null; - } - @Override public String getProfileName() { return ""; @@ -1262,11 +1257,6 @@ public void sendResponse(Exception exception) throws IOException { listener.onFailure(exception); } - @Override - public long getRequestId() { - return 0; - } - @Override public String getChannelType() { return "replica_test"; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 9693a1baadc79..42ec72c981007 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -914,11 +914,6 @@ public void clear() { error.set(null); } - @Override - public String action() { - return "_noop_"; - } - @Override public String getProfileName() { return "_noop_"; @@ -942,11 +937,6 @@ public void sendResponse(Exception exception) throws IOException { assertThat(response.get(), nullValue()); } - @Override - public long getRequestId() { - return 0; - } - @Override public String getChannelType() { return "capturing"; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index b0dc783349ca8..d2068944ab182 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -47,7 +47,6 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -378,21 +377,12 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception { } else { AtomicBoolean sendResponse = new AtomicBoolean(false); request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() { - @Override - public String action() { - return null; - } @Override public String getProfileName() { return null; } - @Override - public long getRequestId() { - return 0; - } - @Override public String getChannelType() { return null;