From 138452ae21c4837fd2a8af2ac4a71ad313dfd447 Mon Sep 17 00:00:00 2001 From: jaymode Date: Wed, 17 Oct 2018 10:32:14 -0600 Subject: [PATCH 1/8] Responses can use Writeable.Reader interface In order to remove Streamable from the codebase, Response objects need to be read using the Writeable.Reader interface which this change enables. This change enables the use of Writeable.Reader by adding the `Action#getResponseReader` method. The default implementation simply uses the existing `newResponse` method and the readFrom method. As responses are migrated to the Writeable.Reader interface, Action classes can be updated to throw an UnsupportedOperationException when `newResponse` is called and override the `getResponseReader` method. Relates #34389 --- .../netty4/Netty4ScheduledPingTests.java | 3 +- .../java/org/elasticsearch/action/Action.java | 15 ++ .../action/ActionListenerResponseHandler.java | 18 +- .../elasticsearch/action/ActionResponse.java | 7 + .../action/TransportActionNodeProxy.java | 2 +- .../tasks/get/TransportGetTaskAction.java | 7 +- .../shards/ClusterSearchShardsAction.java | 8 +- .../shards/ClusterSearchShardsResponse.java | 42 ++--- .../TransportClusterSearchShardsAction.java | 9 +- .../action/ingest/IngestActionForwarder.java | 2 +- .../TransportResyncReplicationAction.java | 8 +- .../action/search/MultiSearchResponse.java | 4 + .../action/search/SearchTransportService.java | 35 ++-- .../broadcast/TransportBroadcastAction.java | 7 +- .../node/TransportBroadcastByNodeAction.java | 6 +- .../master/TransportMasterNodeAction.java | 17 +- .../support/nodes/TransportNodesAction.java | 8 +- .../TransportReplicationAction.java | 22 ++- ...ransportInstanceSingleOperationAction.java | 8 +- .../shard/TransportSingleShardAction.java | 13 +- .../support/tasks/TransportTasksAction.java | 6 +- .../TransportClientNodesService.java | 6 +- .../discovery/zen/MasterFaultDetection.java | 13 +- .../discovery/zen/NodesFaultDetection.java | 13 +- .../gateway/LocalAllocateDangledIndices.java | 6 +- .../indices/flush/SyncedFlushService.java | 18 +- .../recovery/PeerRecoveryTargetService.java | 7 +- .../RecoveryTranslogOperationsResponse.java | 6 +- .../elasticsearch/search/SearchService.java | 4 + .../search/dfs/DfsSearchResult.java | 4 + .../search/fetch/FetchSearchResult.java | 4 + .../search/fetch/QueryFetchSearchResult.java | 4 + .../fetch/ScrollQueryFetchSearchResult.java | 4 + .../search/query/QuerySearchResult.java | 4 + .../search/query/ScrollQuerySearchResult.java | 4 + .../EmptyTransportResponseHandler.java | 3 +- .../transport/RemoteClusterAwareClient.java | 2 +- .../transport/RemoteClusterConnection.java | 10 +- .../elasticsearch/transport/TcpTransport.java | 24 ++- .../elasticsearch/transport/Transport.java | 19 ++- .../transport/TransportActionProxy.java | 55 +++--- .../TransportChannelResponseHandler.java | 13 +- .../transport/TransportMessage.java | 6 + .../transport/TransportResponse.java | 11 ++ .../transport/TransportResponseHandler.java | 24 --- .../transport/TransportService.java | 24 ++- .../ClusterSearchShardsResponseTests.java | 3 +- .../search/TransportSearchActionTests.java | 2 +- .../TransportClientNodesServiceTests.java | 6 +- .../RemoteClusterConnectionTests.java | 12 +- .../transport/TransportActionProxyTests.java | 39 +++-- .../test/transport/CapturingTransport.java | 7 +- .../AbstractSimpleTransportTestCase.java | 160 ++++++++++-------- ...curityServerTransportInterceptorTests.java | 5 +- ...ServerTransportFilterIntegrationTests.java | 9 +- 55 files changed, 456 insertions(+), 322 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index fae4082e81828..0f3185add0833 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -102,7 +103,7 @@ public void testScheduledPing() throws Exception { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/action/Action.java b/server/src/main/java/org/elasticsearch/action/Action.java index 771762ad15c30..f0df6202072a4 100644 --- a/server/src/main/java/org/elasticsearch/action/Action.java +++ b/server/src/main/java/org/elasticsearch/action/Action.java @@ -19,6 +19,7 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.transport.TransportRequestOptions; @@ -45,9 +46,23 @@ public String name() { /** * Creates a new response instance. + * @deprecated Implement {@link #getResponseReader()} instead and make this method throw an + * {@link UnsupportedOperationException} */ + @Deprecated public abstract Response newResponse(); + /** + * Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput} + */ + public Writeable.Reader getResponseReader() { + return in -> { + Response response = newResponse(); + response.readFrom(in); + return response; + }; + } + /** * Optional request options for the action. */ diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index f258be3a16137..432cef6ad3029 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -19,13 +19,15 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import java.io.IOException; import java.util.Objects; -import java.util.function.Supplier; /** * A simple base class for action response listeners, defaulting to using the SAME executor (as its @@ -34,11 +36,11 @@ public class ActionListenerResponseHandler implements TransportResponseHandler { private final ActionListener listener; - private final Supplier responseSupplier; + private final Writeable.Reader reader; - public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { + public ActionListenerResponseHandler(ActionListener listener, Writeable.Reader reader) { this.listener = Objects.requireNonNull(listener); - this.responseSupplier = Objects.requireNonNull(responseSupplier); + this.reader = Objects.requireNonNull(reader); } @Override @@ -52,12 +54,12 @@ public void handleException(TransportException e) { } @Override - public Response newInstance() { - return responseSupplier.get(); + public String executor() { + return ThreadPool.Names.SAME; } @Override - public String executor() { - return ThreadPool.Names.SAME; + public Response read(StreamInput in) throws IOException { + return reader.read(in); } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionResponse.java b/server/src/main/java/org/elasticsearch/action/ActionResponse.java index a1cd3068a269f..dd019ba3f5591 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ActionResponse.java @@ -30,6 +30,13 @@ */ public abstract class ActionResponse extends TransportResponse { + public ActionResponse() { + } + + public ActionResponse(StreamInput in) throws IOException { + super(in); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index c369deb0b10b3..7d8dbd1f975bd 100644 --- a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -48,6 +48,6 @@ public void execute(final DiscoveryNode node, final Request request, final Actio return; } transportService.sendRequest(node, action.name(), request, transportOptions, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 927d2e47680c5..69fc7ee376c0b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -119,8 +120,10 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public GetTaskResponse newInstance() { - return new GetTaskResponse(); + public GetTaskResponse read(StreamInput in) throws IOException { + GetTaskResponse response = new GetTaskResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java index ec936c623a24a..869aecf095431 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.shards; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; public class ClusterSearchShardsAction extends Action { @@ -32,6 +33,11 @@ private ClusterSearchShardsAction() { @Override public ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return ClusterSearchShardsResponse::new; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index f8d448d0fe11c..141bcd86f1252 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -38,12 +38,27 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], new DiscoveryNode[0], Collections.emptyMap()); - private ClusterSearchShardsGroup[] groups; - private DiscoveryNode[] nodes; - private Map indicesAndFilters; - - public ClusterSearchShardsResponse() { + private final ClusterSearchShardsGroup[] groups; + private final DiscoveryNode[] nodes; + private final Map indicesAndFilters; + public ClusterSearchShardsResponse(StreamInput in) throws IOException { + super(in); + groups = new ClusterSearchShardsGroup[in.readVInt()]; + for (int i = 0; i < groups.length; i++) { + groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in); + } + nodes = new DiscoveryNode[in.readVInt()]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = new DiscoveryNode(in); + } + int size = in.readVInt(); + indicesAndFilters = new HashMap<>(); + for (int i = 0; i < size; i++) { + String index = in.readString(); + AliasFilter aliasFilter = new AliasFilter(in); + indicesAndFilters.put(index, aliasFilter); + } } public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, @@ -67,22 +82,7 @@ public Map getIndicesAndFilters() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - groups = new ClusterSearchShardsGroup[in.readVInt()]; - for (int i = 0; i < groups.length; i++) { - groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in); - } - nodes = new DiscoveryNode[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = new DiscoveryNode(in); - } - int size = in.readVInt(); - indicesAndFilters = new HashMap<>(); - for (int i = 0; i < size; i++) { - String index = in.readString(); - AliasFilter aliasFilter = new AliasFilter(in); - indicesAndFilters.put(index, aliasFilter); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 9774ecdffba17..f4f36ca4d65e9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -39,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -72,7 +74,12 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C @Override protected ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java index 6f5147c38bdbb..ae5a736bde66d 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java @@ -49,7 +49,7 @@ public IngestActionForwarder(TransportService transportService) { public void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) { transportService.sendRequest(randomIngestNode(), action.name(), request, - new ActionListenerResponseHandler(listener, action::newResponse)); + new ActionListenerResponseHandler(listener, action.getResponseReader())); } private DiscoveryNode randomIngestNode() { diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 6d0c35345b1fa..50d75b20dc82b 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Consumer; import java.util.function.Supplier; @@ -151,8 +153,10 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima transportOptions, new TransportResponseHandler() { @Override - public ResyncReplicationResponse newInstance() { - return newResponseInstance(); + public ResyncReplicationResponse read(StreamInput in) throws IOException { + ResyncReplicationResponse response = newResponseInstance(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index cb30385ecc868..f2b1b0d5c6265 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -135,6 +135,10 @@ public Exception getFailure() { MultiSearchResponse() { } + MultiSearchResponse(StreamInput in) throws IOException { + readFrom(in); + } + public MultiSearchResponse(Item[] items, long tookInMillis) { this.items = items; this.tookInMillis = tookInMillis; diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index fd43a948ee5fb..302ed4ccbfec9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; @@ -60,7 +61,6 @@ import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -119,7 +119,7 @@ public void sendCanMatch(Transport.Connection connection, final ShardSearchTrans public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -133,11 +133,11 @@ public void sendExecuteQuery(Transport.Connection connection, final ShardSearchT // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // this used to be the QUERY_AND_FETCH which doesn't exist anymore. final boolean fetchDocuments = request.numberOfShards() == 1; - Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, @@ -155,8 +155,8 @@ public void sendExecuteScrollQuery(Transport.Connection connection, final Intern public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, - clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, clientConnections, + connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -279,6 +279,10 @@ public static class SearchFreeContextResponse extends TransportResponse { SearchFreeContextResponse() { } + SearchFreeContextResponse(StreamInput in) throws IOException { + freed = in.readBoolean(); + } + SearchFreeContextResponse(boolean freed) { this.freed = freed; } @@ -306,22 +310,20 @@ public static void registerRequestHandler(TransportService transportService, Sea boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new, (request, channel, task) -> { boolean freed = searchService.freeContext(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME, (request, channel, task) -> { searchService.freeAllScrollContexts(); channel.sendResponse(TransportResponse.Empty.INSTANCE); }); TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, (request, channel, task) -> { @@ -352,8 +354,8 @@ public void onFailure(Exception e) { searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( channel, QUERY_ACTION_NAME, request)); }); - TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, - (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); + TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, + (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, QuerySearchRequest::new, (request, channel, task) -> { @@ -395,8 +397,7 @@ public void onFailure(Exception e) { (request, channel, task) -> { searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request)); }); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, - (Supplier) SearchService.CanMatchResponse::new); + TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); } @@ -419,9 +420,9 @@ final class ConnectionCountingHandler extend private final Map clientConnections; private final String nodeId; - ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + ConnectionCountingHandler(final ActionListener listener, final Writeable.Reader responseReader, final Map clientConnections, final String nodeId) { - super(listener, responseSupplier); + super(listener, responseReader); this.clientConnections = clientConnections; this.nodeId = nodeId; // Increment the number of connections for this node by one diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 22c4a70b0ea55..27dcb11da3869 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; @@ -173,8 +174,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting } else { transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler() { @Override - public ShardResponse newInstance() { - return newShardResponse(); + public ShardResponse read(StreamInput in) throws IOException { + ShardResponse response = newShardResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 9079238b7b62e..f097539626458 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -313,8 +313,10 @@ private void sendNodeRequest(final DiscoveryNode node, List shards } transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return new NodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = new NodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index a9ed05ac0377f..2b369c2591e54 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,6 +48,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Predicate; import java.util.function.Supplier; @@ -101,8 +103,19 @@ protected TransportMasterNodeAction(Settings settings, String actionName, boolea protected abstract String executor(); + /** + * Returns a new response instance for serialization. This is deprecated; new implementors + * should override {@link #read(StreamInput)} and use the {@link Writeable.Reader} interface. + */ + @Deprecated protected abstract Response newResponse(); + protected Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; + } + protected abstract void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception; /** @@ -201,8 +214,8 @@ protected void doRun() throws Exception { } else { DiscoveryNode masterNode = nodes.getMasterNode(); final String actionName = getMasterActionName(masterNode); - transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, - TransportMasterNodeAction.this::newResponse) { + transportService.sendRequest(masterNode, actionName, request, + new ActionListenerResponseHandler(listener, TransportMasterNodeAction.this::read) { @Override public void handleException(final TransportException exp) { Throwable cause = exp.unwrapCause(); diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 2be4e5bf053cc..317792c610479 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -39,6 +40,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -186,8 +188,10 @@ void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return newNodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = newNodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 695c9162633f6..820ab0300d69a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -317,12 +318,17 @@ public void onResponse(PrimaryShardReference primaryShardReference) { // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. final ShardRouting primary = primaryShardReference.routingEntry(); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; + final Writeable.Reader reader = in -> { + Response response = TransportReplicationAction.this.newResponseInstance(); + response.readFrom(in); + return response; + }; DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - TransportReplicationAction.this::newResponseInstance) { + reader) { @Override public void handleResponse(Response response) { @@ -577,7 +583,7 @@ public void onNewClusterState(ClusterState state) { String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), @@ -813,8 +819,10 @@ private void performAction(final DiscoveryNode node, final String action, final transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponseInstance(); + public Response read(StreamInput in) throws IOException { + Response response = newResponseInstance(); + response.readFrom(in); + return response; } @Override @@ -1186,7 +1194,11 @@ protected void sendReplicaRequest( final ConcreteReplicaRequest replicaRequest, final DiscoveryNode node, final ActionListener listener) { - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, ReplicaResponse::new); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { + ReplicaResponse replicaResponse = new ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + }); transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index e8e710aa81f2c..3a5d8d0e382e9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.NodeClosedException; @@ -47,6 +48,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Supplier; public abstract class TransportInstanceSingleOperationAction, Response extends ActionResponse> @@ -178,8 +180,10 @@ protected void doStart(ClusterState clusterState) { transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 436089ab3be73..0a50413e96964 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -182,8 +183,10 @@ public void start() { // just execute it on the local node transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override @@ -246,8 +249,10 @@ private void perform(@Nullable final Exception currentFailure) { transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 38a0d96600ce8..dad2bb8ad0896 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -270,8 +270,10 @@ private void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeTasksResponse newInstance() { - return new NodeTasksResponse(); + public NodeTasksResponse read(StreamInput in) throws IOException { + NodeTasksResponse response = new NodeTasksResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index aa0672d80ba1d..0cfc1f5004ce8 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -511,8 +511,10 @@ protected void doRun() throws Exception { new TransportResponseHandler() { @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + final ClusterStateResponse clusterStateResponse = new ClusterStateResponse(); + clusterStateResponse.readFrom(in); + return clusterStateResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 5acf2effad390..7ae1ce4d7f555 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -225,8 +225,8 @@ public void run() { transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new TransportResponseHandler() { @Override - public MasterPingResponseResponse newInstance() { - return new MasterPingResponseResponse(); + public MasterPingResponseResponse read(StreamInput in) { + return new MasterPingResponseResponse(in); } @Override @@ -433,14 +433,7 @@ private static class MasterPingResponseResponse extends TransportResponse { private MasterPingResponseResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private MasterPingResponseResponse(StreamInput in) { } } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 57e5cab020be1..d3d5e74a17682 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -226,8 +226,8 @@ public void run() { .withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler() { @Override - public PingResponse newInstance() { - return new PingResponse(); + public PingResponse read(StreamInput in) { + return new PingResponse(in); } @Override @@ -359,14 +359,7 @@ private static class PingResponse extends TransportResponse { private PingResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private PingResponse(StreamInput in) { } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java index 7bc2e38dde024..5630ceea72945 100644 --- a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java +++ b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java @@ -84,8 +84,10 @@ public void allocateDangled(Collection indices, final Listener li AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()])); transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler() { @Override - public AllocateDangledResponse newInstance() { - return new AllocateDangledResponse(); + public AllocateDangledResponse read(StreamInput in) throws IOException { + final AllocateDangledResponse response = new AllocateDangledResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index fb7885a217e01..aeb88021f26e1 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -313,8 +313,10 @@ protected void getInflightOpsCount(final ShardId shardId, ClusterState state, In transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId), new TransportResponseHandler() { @Override - public InFlightOpsResponse newInstance() { - return new InFlightOpsResponse(); + public InFlightOpsResponse read(StreamInput in) throws IOException { + InFlightOpsResponse response = new InFlightOpsResponse(); + response.readFrom(in); + return response; } @Override @@ -383,8 +385,10 @@ void sendSyncRequests(final String syncId, final List shards, Clus transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId), new TransportResponseHandler() { @Override - public ShardSyncedFlushResponse newInstance() { - return new ShardSyncedFlushResponse(); + public ShardSyncedFlushResponse read(StreamInput in) throws IOException { + ShardSyncedFlushResponse response = new ShardSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override @@ -437,8 +441,10 @@ void sendPreSyncRequests(final List shards, final ClusterState sta } transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler() { @Override - public PreSyncedFlushResponse newInstance() { - return new PreSyncedFlushResponse(); + public PreSyncedFlushResponse read(StreamInput in) throws IOException { + PreSyncedFlushResponse response = new PreSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index f60994a4bced4..39709eb3ac2ff 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -195,8 +196,10 @@ private void doRecovery(final long recoveryId) { transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override - public RecoveryResponse newInstance() { - return new RecoveryResponse(); + public RecoveryResponse read(StreamInput in) throws IOException { + RecoveryResponse recoveryResponse = new RecoveryResponse(); + recoveryResponse.readFrom(in); + return recoveryResponse; } }).txGet())); final RecoveryResponse recoveryResponse = responseHolder.get(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java index 530b8b67415d3..8633380f3947a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java @@ -63,8 +63,10 @@ public void readFrom(final StreamInput in) throws IOException { static TransportResponseHandler HANDLER = new FutureTransportResponseHandler() { @Override - public RecoveryTranslogOperationsResponse newInstance() { - return new RecoveryTranslogOperationsResponse(); + public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException { + RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse(); + response.readFrom(in); + return response; } }; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index d8829bd11d386..4f4be95337315 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1101,6 +1101,10 @@ public static final class CanMatchResponse extends SearchPhaseResult { public CanMatchResponse() { } + public CanMatchResponse(StreamInput in) throws IOException { + this.canMatch = in.readBoolean(); + } + public CanMatchResponse(boolean canMatch) { this.canMatch = canMatch; } diff --git a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 8de89089c4f01..718b895217433 100644 --- a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -46,6 +46,10 @@ public class DfsSearchResult extends SearchPhaseResult { public DfsSearchResult() { } + public DfsSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public DfsSearchResult(long id, SearchShardTarget shardTarget) { this.setSearchShardTarget(shardTarget); this.requestId = id; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index a5f27733ad28a..12391151861d0 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -38,6 +38,10 @@ public final class FetchSearchResult extends SearchPhaseResult { public FetchSearchResult() { } + public FetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public FetchSearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 8d1e6276e65d9..0a5a7cec375db 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -38,6 +38,10 @@ public final class QueryFetchSearchResult extends SearchPhaseResult { public QueryFetchSearchResult() { } + public QueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { this.queryResult = queryResult; this.fetchResult = fetchResult; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index 55aa4a96d018c..6b0a8b619bff3 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -36,6 +36,10 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { public ScrollQueryFetchSearchResult() { } + public ScrollQueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 2aded57ece04c..43654823914b4 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -66,6 +66,10 @@ public final class QuerySearchResult extends SearchPhaseResult { public QuerySearchResult() { } + public QuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index 6401459489955..632d148ea901b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -35,6 +35,10 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult { public ScrollQuerySearchResult() { } + public ScrollQuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java index c5814cf0fefcc..7ff1ef8391fd6 100644 --- a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; public class EmptyTransportResponseHandler implements TransportResponseHandler { @@ -32,7 +33,7 @@ public EmptyTransportResponseHandler(String executor) { } @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index d93bbb57201e2..8e72e6d5768f1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -47,7 +47,7 @@ void doExecute(Action action, Request request, ActionListener { Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c9f3a2aa36540..48f086ad972bf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -218,8 +218,8 @@ private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, new TransportResponseHandler() { @Override - public ClusterSearchShardsResponse newInstance() { - return new ClusterSearchShardsResponse(); + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override @@ -591,8 +591,10 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl } @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + ClusterStateResponse response = new ClusterStateResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 27b4aa7293e18..ad41e8c2902a3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -205,7 +205,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric readBytesMetric = new MeanMetric(); private final MeanMetric transmittedBytesMetric = new MeanMetric(); - private volatile Map requestHandlers = Collections.emptyMap(); + private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final TransportLogger transportLogger; private final BytesReference pingMessage; @@ -284,8 +284,8 @@ private static class HandshakeResponseHandler implements TransportResponseHandle } @Override - public VersionHandshakeResponse newInstance() { - return new VersionHandshakeResponse(); + public VersionHandshakeResponse read(StreamInput in) throws IOException { + return new VersionHandshakeResponse(in); } @Override @@ -1273,7 +1273,8 @@ public final void messageReceived(BytesReference reference, TcpChannel channel) if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener); + TransportResponseHandler theHandler = + responseHandlers.onResponseReceived(requestId, messageListener); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { @@ -1319,8 +1320,9 @@ static void ensureVersionCompatibility(Version version, Version currentVersion, } } - private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) { - final TransportResponse response; + private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, + final TransportResponseHandler handler) { + final T response; try { response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); @@ -1469,17 +1471,13 @@ public void onFailure(Exception e) { } private static final class VersionHandshakeResponse extends TransportResponse { - private Version version; + private final Version version; private VersionHandshakeResponse(Version version) { this.version = version; } - private VersionHandshakeResponse() { - } - - @Override - public void readFrom(StreamInput in) throws IOException { + private VersionHandshakeResponse(StreamInput in) throws IOException { super.readFrom(in); version = Version.readVersion(in); } @@ -1736,7 +1734,7 @@ public final ResponseHandlers getResponseHandlers() { } @Override - public final RequestHandlerRegistry getRequestHandler(String action) { + public final RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index fc1f0c9e5ec0f..e13213dca066a 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -54,7 +54,7 @@ public interface Transport extends LifecycleComponent { * Returns the registered request handler registry for the given action or null if it's not registered * @param action the action to look up */ - RequestHandlerRegistry getRequestHandler(String action); + RequestHandlerRegistry getRequestHandler(String action); void addMessageListener(TransportMessageListener listener); @@ -184,7 +184,7 @@ public String action() { * This class is a registry that allows */ final class ResponseHandlers { - private final ConcurrentMapLong handlers = ConcurrentCollections + private final ConcurrentMapLong> handlers = ConcurrentCollections .newConcurrentMapLongWithAggressiveConcurrency(); private final AtomicLong requestIdGenerator = new AtomicLong(); @@ -208,7 +208,7 @@ public ResponseContext remove(long requestId) { * @return the new request ID * @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions) */ - public long add(ResponseContext holder) { + public long add(ResponseContext holder) { long requestId = newRequestId(); ResponseContext existing = handlers.put(requestId, holder); assert existing == null : "request ID already in use: " + requestId; @@ -226,10 +226,10 @@ long newRequestId() { /** * Removes and returns all {@link ResponseContext} instances that match the predicate */ - public List prune(Predicate predicate) { - final List holders = new ArrayList<>(); - for (Map.Entry entry : handlers.entrySet()) { - ResponseContext holder = entry.getValue(); + public List> prune(Predicate predicate) { + final List> holders = new ArrayList<>(); + for (Map.Entry> entry : handlers.entrySet()) { + ResponseContext holder = entry.getValue(); if (predicate.test(holder)) { ResponseContext remove = handlers.remove(entry.getKey()); if (remove != null) { @@ -245,8 +245,9 @@ public List prune(Predicate predicate) { * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ - public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) { - ResponseContext context = handlers.remove(requestId); + public TransportResponseHandler onResponseReceived(final long requestId, + final TransportMessageListener listener) { + ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { return null; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index a17509e826003..d65343a1a130f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.function.Function; -import java.util.function.Supplier; /** * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second @@ -43,10 +42,10 @@ private static class ProxyRequestHandler implements Tran private final TransportService service; private final String action; - private final Function> responseFunction; + private final Function> responseFunction; ProxyRequestHandler(TransportService service, String action, Function> responseFunction) { + Writeable.Reader> responseFunction) { this.service = service; this.action = action; this.responseFunction = responseFunction; @@ -63,17 +62,17 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro private static class ProxyResponseHandler implements TransportResponseHandler { - private final Supplier responseFactory; + private final Writeable.Reader reader; private final TransportChannel channel; - ProxyResponseHandler(TransportChannel channel, Supplier responseFactory) { - this.responseFactory = responseFactory; + ProxyResponseHandler(TransportChannel channel, Writeable.Reader reader) { + this.reader = reader; this.channel = channel; - } + @Override - public T newInstance() { - return responseFactory.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override @@ -101,26 +100,24 @@ public String executor() { } static class ProxyRequest extends TransportRequest { - T wrapped; - Writeable.Reader reader; - DiscoveryNode targetNode; - - ProxyRequest(Writeable.Reader reader) { - this.reader = reader; - } + final T wrapped; + final DiscoveryNode targetNode; ProxyRequest(T wrapped, DiscoveryNode targetNode) { this.wrapped = wrapped; this.targetNode = targetNode; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { targetNode = new DiscoveryNode(in); wrapped = reader.read(in); } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -133,21 +130,23 @@ public void writeTo(StreamOutput out) throws IOException { * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type changes based on the upcoming request (quite rare) */ - public static void registerProxyAction(TransportService service, String action, - Function> responseFunction) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, responseFunction)); + public static void registerProxyActionWithDynamicResponseType(TransportService service, String action, + Function> responseFunction) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, responseFunction)); } /** * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type is always the same (most of the cases). */ - public static void registerProxyAction(TransportService service, String action, Supplier responseSupplier) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, request -> responseSupplier)); + public static void registerProxyAction(TransportService service, String action, + Writeable.Reader reader) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, request -> reader)); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 4ba2769edb4a2..6b45feec94859 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -21,10 +21,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel @@ -34,19 +35,19 @@ public class TransportChannelResponseHandler implem private final Logger logger; private final TransportChannel channel; private final String extraInfoOnError; - private final Supplier responseSupplier; + private final Writeable.Reader reader; public TransportChannelResponseHandler(Logger logger, TransportChannel channel, String extraInfoOnError, - Supplier responseSupplier) { + Writeable.Reader reader) { this.logger = logger; this.channel = channel; this.extraInfoOnError = extraInfoOnError; - this.responseSupplier = responseSupplier; + this.reader = reader; } @Override - public T newInstance() { - return responseSupplier.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index ecaca73b2db57..4c328ba65c01d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -39,6 +39,12 @@ public TransportAddress remoteAddress() { return remoteAddress; } + public TransportMessage() { + } + + public TransportMessage(StreamInput in) throws IOException { + } + @Override public void readFrom(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java index 25ae72a479f7d..7786eca9d3610 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -19,8 +19,19 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + public abstract class TransportResponse extends TransportMessage { + public TransportResponse() { + } + + public TransportResponse(StreamInput in) throws IOException { + super(in); + } + public static class Empty extends TransportResponse { public static final Empty INSTANCE = new Empty(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index fbe477ad04b1d..29720216cf400 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,34 +19,10 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; -import java.io.IOException; - public interface TransportResponseHandler extends Writeable.Reader { - /** - * @deprecated Implement {@link #read(StreamInput)} instead. - */ - @Deprecated - default T newInstance() { - throw new UnsupportedOperationException(); - } - - /** - * deserializes a new instance of the return type from the stream. - * called by the infra when de-serializing the response. - * - * @return the deserialized response. - */ - @Override - default T read(StreamInput in) throws IOException { - T instance = newInstance(); - instance.readFrom(in); - return instance; - } - void handleResponse(T response); void handleException(TransportException exp); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index db14fd015fd82..c2ae982b3dce1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -434,8 +434,8 @@ public HandshakeResponse handshake( PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @Override - public HandshakeResponse newInstance() { - return new HandshakeResponse(); + public HandshakeResponse read(StreamInput in) throws IOException { + return new HandshakeResponse(in); } }); sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, @@ -468,12 +468,9 @@ private HandshakeRequest() { } public static class HandshakeResponse extends TransportResponse { - private DiscoveryNode discoveryNode; - private ClusterName clusterName; - private Version version; - - HandshakeResponse() { - } + private final DiscoveryNode discoveryNode; + private final ClusterName clusterName; + private final Version version; public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { this.discoveryNode = discoveryNode; @@ -481,9 +478,8 @@ public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, V this.clusterName = clusterName; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public HandshakeResponse(StreamInput in) throws IOException { + super(in); discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); clusterName = new ClusterName(in); version = Version.readVersion(in); @@ -930,7 +926,7 @@ public void onRequestReceived(long requestId, String action) { } } - public RequestHandlerRegistry getRequestHandler(String action) { + public RequestHandlerRegistry getRequestHandler(String action) { return transport.getRequestHandler(action); } @@ -977,8 +973,8 @@ private void checkForTimeout(long requestId) { @Override public void onConnectionClosed(Transport.Connection connection) { try { - List pruned = responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection - .getCacheKey())); + List> pruned = + responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey())); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows getExecutorService().execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java index f685be02141ad..fbfe0e497017f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java @@ -83,8 +83,7 @@ public void testSerialization() throws Exception { clusterSearchShardsResponse.writeTo(out); try(StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) { in.setVersion(version); - ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(); - deserialized.readFrom(in); + ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(in); assertArrayEquals(clusterSearchShardsResponse.getNodes(), deserialized.getNodes()); assertEquals(clusterSearchShardsResponse.getGroups().length, deserialized.getGroups().length); for (int i = 0; i < clusterSearchShardsResponse.getGroups().length; i++) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index c763709a04e40..e529af97c800d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -254,7 +254,7 @@ public void testBuildClusters() { remoteIndices.put(cluster, randomOriginalIndices()); if (onlySuccessful || randomBoolean()) { //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse()); + searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); successful++; } else { searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index d64cdf89ef7ea..41d691c95bd90 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -253,8 +253,8 @@ public void onFailure(Exception e) { iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) { + return new TestResponse(in); } @Override @@ -435,5 +435,7 @@ public static class TestRequest extends TransportRequest { private static class TestResponse extends TransportResponse { + private TestResponse() {} + private TestResponse(StreamInput in) {} } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 46364c19ee0ec..6c27680d74162 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -172,9 +172,7 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -215,9 +213,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -233,9 +229,7 @@ public ClusterSearchShardsResponse read(StreamInput in) throws IOException { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index 428d416ac0242..b8917b02e8982 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -86,8 +86,7 @@ public void testSendMessage() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -96,8 +95,7 @@ public void testSendMessage() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -105,8 +103,7 @@ public void testSendMessage() throws InterruptedException { serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_C"; + SimpleTestResponse response = new SimpleTestResponse("TS_C"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceC, "internal:test", SimpleTestResponse::new); @@ -115,8 +112,8 @@ public void testSendMessage() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -149,8 +146,7 @@ public void testException() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -159,8 +155,7 @@ public void testException() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -175,8 +170,8 @@ public void testException() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -228,11 +223,19 @@ public void writeTo(StreamOutput out) throws IOException { } public static class SimpleTestResponse extends TransportResponse { - String targetNode; + final String targetNode; + + SimpleTestResponse(String targetNode) { + this.targetNode = targetNode; + } + + SimpleTestResponse(StreamInput in) throws IOException { + this.targetNode = in.readString(); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - targetNode = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -263,7 +266,7 @@ public void testIsProxyAction() { } public void testIsProxyRequest() { - assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null))); + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(TransportRequest.Empty.INSTANCE, null))); assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 132a07d5b7f48..1b8405a2d591a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -163,8 +164,10 @@ public void clear() { /** * simulate a response for the given requestId */ - public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + public void handleResponse(final long requestId, final Response response) { + TransportResponseHandler handler = + (TransportResponseHandler) responseHandlers.onResponseReceived(requestId, listener); + handler.handleResponse(response); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3b64f00084ec8..592de73cbfc74 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -219,8 +219,8 @@ public void testHelloWorld() { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -250,8 +250,8 @@ public void handleException(TransportException exp) { res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -298,8 +298,8 @@ public void testThreadContext() throws ExecutionException, InterruptedException final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0])); TransportResponseHandler responseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -353,8 +353,8 @@ public void testLocalNodeConnection() throws InterruptedException { serviceA.sendRequest(nodeA, "internal:localNode", new StringMessageRequest("test"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -502,7 +502,7 @@ public void testVoidMessageCompressed() { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -550,8 +550,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -592,8 +592,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHelloException", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -644,7 +644,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti serviceA.registerRequestHandler("internal:test", TestRequest::new, randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { logger.info("caught exception while responding", e); responseErrors.add(e); @@ -652,7 +652,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti }); final TransportRequestHandler ignoringRequestHandler = (request, channel, task) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { // we don't really care what's going on B, we're testing through A logger.trace("caught exception while responding from node B", e); @@ -808,8 +808,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -872,8 +872,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -910,8 +910,8 @@ public void handleException(TransportException exp) { new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -961,8 +961,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportResponseHandler noopResponseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1160,19 +1160,19 @@ public void writeTo(StreamOutput out) throws IOException { static class StringMessageResponse extends TransportResponse { - private String message; + private final String message; StringMessageResponse(String message) { this.message = message; } - StringMessageResponse() { + StringMessageResponse(StreamInput in) throws IOException { + this.message = in.readString(); } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - message = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1224,12 +1224,19 @@ public void writeTo(StreamOutput out) throws IOException { static class Version0Response extends TransportResponse { - int value1; + final int value1; + + Version0Response(int value1) { + this.value1 = value1; + } + + Version0Response(StreamInput in) throws IOException { + this.value1 = in.readInt(); + } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - value1 = in.readInt(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1241,16 +1248,27 @@ public void writeTo(StreamOutput out) throws IOException { static class Version1Response extends Version0Response { - int value2; + final int value2; - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + Version1Response(int value1, int value2) { + super(value1); + this.value2 = value2; + } + + Version1Response(StreamInput in) throws IOException { + super(in); if (in.getVersion().onOrAfter(version1)) { value2 = in.readInt(); + } else { + value2 = 0; } } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -1267,9 +1285,7 @@ public void testVersionFrom0to1() throws Exception { public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(0)); // not set, coming from service A - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1280,8 +1296,8 @@ public void messageReceived(Version1Request request, TransportChannel channel, T Version0Response version0Response = serviceA.submitRequest(nodeB, "internal:version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1310,8 +1326,7 @@ public void testVersionFrom1to0() throws Exception { @Override public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1323,8 +1338,8 @@ public void messageReceived(Version0Request request, TransportChannel channel, T Version1Response version1Response = serviceB.submitRequest(nodeA, "internal:version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1354,9 +1369,7 @@ public void testVersionFrom1to1() throws Exception { (request, channel, task) -> { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(2)); - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version1, channel.getVersion()); }); @@ -1367,8 +1380,8 @@ public void testVersionFrom1to1() throws Exception { Version1Response version1Response = serviceB.submitRequest(nodeB, "internal:version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1397,8 +1410,7 @@ public void testVersionFrom0to0() throws Exception { serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME, (request, channel, task) -> { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); }); @@ -1408,8 +1420,8 @@ public void testVersionFrom0to0() throws Exception { Version0Response version0Response = serviceA.submitRequest(nodeA, "internal:version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1444,8 +1456,8 @@ public void testMockFailToSendNoConnectRule() throws Exception { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1502,8 +1514,8 @@ public void testMockUnresponsiveRule() throws IOException { new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1547,13 +1559,13 @@ public void testHostOnMessages() throws InterruptedException { final AtomicReference addressB = new AtomicReference<>(); serviceB.registerRequestHandler("internal:action1", TestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { addressA.set(request.remoteAddress()); - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); latch.countDown(); }); serviceA.sendRequest(nodeB, "internal:action1", new TestRequest(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1600,8 +1612,8 @@ public void testBlockingIncomingRequests() throws Exception { serviceA.sendRequest(connection, "internal:action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1666,9 +1678,10 @@ public String toString() { private static class TestResponse extends TransportResponse { - String info; + final String info; - TestResponse() { + TestResponse(StreamInput in) throws IOException { + this.info = in.readOptionalString(); } TestResponse(String info) { @@ -1677,8 +1690,7 @@ private static class TestResponse extends TransportResponse { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - info = in.readOptionalString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1763,8 +1775,8 @@ public void messageReceived(TestRequest request, TransportChannel channel, Task TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1820,8 +1832,8 @@ class TestResponseHandler implements TransportResponseHandler { } @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -2086,7 +2098,7 @@ public void testResponseHeadersArePreserved() throws InterruptedException { TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2140,7 +2152,7 @@ public void testHandlerIsInvokedOnConnectionClose() throws IOException, Interrup CountDownLatch latch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2217,7 +2229,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2285,7 +2297,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2399,7 +2411,7 @@ protected void doRun() throws Exception { AtomicReference receivedException = new AtomicReference<>(null); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index a7351ccfe14d1..1b85049da235b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -335,7 +336,7 @@ public void testContextRestoreResponseHandler() throws Exception { threadContext.wrapRestorable(storedContext), new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } @@ -374,7 +375,7 @@ public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exc new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java index abd5768bebec9..6ff18cc77a1e2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -176,8 +177,12 @@ public void testThatConnectionToClientTypeConnectionIsRejected() throws IOExcept TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TransportResponse newInstance() { - fail("never get that far"); + public TransportResponse read(StreamInput in) { + try { + fail("never get that far"); + } finally { + latch.countDown(); + } return null; } From 3da7aabba1f73b2051248b86a5e36e1f7264a151 Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:11:17 -0600 Subject: [PATCH 2/8] call super and fix test --- .../java/org/elasticsearch/transport/TransportActionProxy.java | 1 + .../org/elasticsearch/transport/TransportActionProxyTests.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index d65343a1a130f..a5b926249f8e2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -109,6 +109,7 @@ static class ProxyRequest extends TransportRequest { } ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index b8917b02e8982..7d52c12e47364 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -128,7 +128,7 @@ public void handleResponse(SimpleTestResponse response) { @Override public void handleException(TransportException exp) { try { - throw new AssertionError(exp); + throw new AssertionError(exp); } finally { latch.countDown(); } @@ -230,6 +230,7 @@ public static class SimpleTestResponse extends TransportResponse { } SimpleTestResponse(StreamInput in) throws IOException { + super(in); this.targetNode = in.readString(); } From 0fbe158e746295aaac2bc80e923305a07fa13abd Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:12:30 -0600 Subject: [PATCH 3/8] move methods --- .../shards/ClusterSearchShardsResponse.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index 141bcd86f1252..57407bd61fb82 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -61,25 +61,6 @@ public ClusterSearchShardsResponse(StreamInput in) throws IOException { } } - public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, - Map indicesAndFilters) { - this.groups = groups; - this.nodes = nodes; - this.indicesAndFilters = indicesAndFilters; - } - - public ClusterSearchShardsGroup[] getGroups() { - return groups; - } - - public DiscoveryNode[] getNodes() { - return nodes; - } - - public Map getIndicesAndFilters() { - return indicesAndFilters; - } - @Override public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); @@ -103,6 +84,25 @@ public void writeTo(StreamOutput out) throws IOException { } } + public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, + Map indicesAndFilters) { + this.groups = groups; + this.nodes = nodes; + this.indicesAndFilters = indicesAndFilters; + } + + public ClusterSearchShardsGroup[] getGroups() { + return groups; + } + + public DiscoveryNode[] getNodes() { + return nodes; + } + + public Map getIndicesAndFilters() { + return indicesAndFilters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); From 416095cd1ac233d0f4214244635b600b69190433 Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:16:11 -0600 Subject: [PATCH 4/8] fix javadoc --- .../action/support/master/TransportMasterNodeAction.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 2b369c2591e54..2fda1d5f698ae 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -104,8 +105,10 @@ protected TransportMasterNodeAction(Settings settings, String actionName, boolea protected abstract String executor(); /** - * Returns a new response instance for serialization. This is deprecated; new implementors - * should override {@link #read(StreamInput)} and use the {@link Writeable.Reader} interface. + * @deprecated new implementors should override {@link #read(StreamInput)} and use the + * {@link Writeable.Reader} interface. + * @return a new response instance. Typically this is used for serialization using the + * {@link Streamable#readFrom(StreamInput)} method. */ @Deprecated protected abstract Response newResponse(); From 66f1699aab2377cf419582bcfbafc50dabff8023 Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:17:04 -0600 Subject: [PATCH 5/8] fix indent --- .../master/TransportMasterNodeAction.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 2fda1d5f698ae..10780a55c27bf 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -219,19 +219,19 @@ protected void doRun() throws Exception { final String actionName = getMasterActionName(masterNode); transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, TransportMasterNodeAction.this::read) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request with action name [{}] to " + - "master node [{}], scheduling a retry. Error: [{}]", - actionName, nodes.getMasterNode(), exp.getDetailedMessage()); - retry(cause, masterChangePredicate); - } else { - listener.onFailure(exp); + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to " + + "master node [{}], scheduling a retry. Error: [{}]", + actionName, nodes.getMasterNode(), exp.getDetailedMessage()); + retry(cause, masterChangePredicate); + } else { + listener.onFailure(exp); + } } - } }); } } From 1a046395bffad9babe24df13b6ccf4d90189a01a Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:18:36 -0600 Subject: [PATCH 6/8] call super ctor --- .../org/elasticsearch/discovery/zen/NodesFaultDetection.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index d3d5e74a17682..40bde9ee81d15 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -226,7 +226,7 @@ public void run() { .withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler() { @Override - public PingResponse read(StreamInput in) { + public PingResponse read(StreamInput in) throws IOException { return new PingResponse(in); } @@ -359,7 +359,8 @@ private static class PingResponse extends TransportResponse { private PingResponse() { } - private PingResponse(StreamInput in) { + private PingResponse(StreamInput in) throws IOException { + super(in); } } } From b2d1fa733d0cf38c52f9d0b807bd0071fb23d550 Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:20:10 -0600 Subject: [PATCH 7/8] call super ctor --- .../elasticsearch/discovery/zen/MasterFaultDetection.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 7ae1ce4d7f555..b48ea77e64c75 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -225,7 +225,7 @@ public void run() { transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new TransportResponseHandler() { @Override - public MasterPingResponseResponse read(StreamInput in) { + public MasterPingResponseResponse read(StreamInput in) throws IOException { return new MasterPingResponseResponse(in); } @@ -433,7 +433,8 @@ private static class MasterPingResponseResponse extends TransportResponse { private MasterPingResponseResponse() { } - private MasterPingResponseResponse(StreamInput in) { + private MasterPingResponseResponse(StreamInput in) throws IOException { + super(in); } } } From 32a3d6853c7b6c8df30f949d6f5d43128e878fde Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 22 Oct 2018 12:35:39 -0600 Subject: [PATCH 8/8] add javadoc for constructors --- .../org/elasticsearch/transport/TransportMessage.java | 7 +++++++ .../org/elasticsearch/transport/TransportResponse.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index 4c328ba65c01d..05deab8eafbf0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -39,9 +39,16 @@ public TransportAddress remoteAddress() { return remoteAddress; } + /** + * Constructs a new empty transport message + */ public TransportMessage() { } + /** + * Constructs a new transport message with the data from the {@link StreamInput}. This is + * currently a no-op + */ public TransportMessage(StreamInput in) throws IOException { } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java index 7786eca9d3610..5ad9c9fee544e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -25,9 +25,17 @@ public abstract class TransportResponse extends TransportMessage { + /** + * Constructs a new empty transport response + */ public TransportResponse() { } + /** + * Constructs a new transport response with the data from the {@link StreamInput}. This is + * currently a no-op. However, this exists to allow extenders to call super(in) + * so that reading can mirror writing where we often call super.writeTo(out). + */ public TransportResponse(StreamInput in) throws IOException { super(in); }