From 9a1e89a293a8afd3355746945ac3af891c5cc079 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 28 Dec 2017 10:12:24 +0100 Subject: [PATCH] Add Writeable.Reader support to TransportResponseHandler --- .../indices/store/IndicesStore.java | 15 ++++------ .../transport/PlainTransportFuture.java | 6 ++-- .../elasticsearch/transport/TcpTransport.java | 8 ++--- .../transport/TransportResponseHandler.java | 29 +++++++++++++++---- .../transport/TransportService.java | 4 +-- .../TransportClientNodesServiceTests.java | 6 ++-- .../discovery/zen/UnicastZenPingTests.java | 5 ++-- .../AssertingTransportInterceptor.java | 6 ++-- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 2ae8d12a9fe9f..294484c659863 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -238,8 +238,8 @@ private class ShardActiveResponseHandler implements TransportResponseHandler { +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +public interface TransportResponseHandler extends Writeable.Reader { + + /** + * @deprecated Implement {@link #read(StreamInput)} instead. + */ + @Deprecated + default T newInstance() { + throw new UnsupportedOperationException(); + } /** - * creates a new instance of the return type from the remote call. - * called by the infra before de-serializing the response. + * deserializes a new instance of the return type from the stream. + * called by the infra when de-serializing the response. * - * @return a new response copy. + * @return the deserialized response. */ - T newInstance(); + @SuppressWarnings("deprecation") + @Override + default T read(StreamInput in) throws IOException { + T instance = newInstance(); + instance.readFrom(in); + return instance; + } void handleResponse(T response); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 2f87deb3bd759..a59ffcaa872d2 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1079,8 +1079,8 @@ public ContextRestoreResponseHandler(Supplier conte } @Override - public T newInstance() { - return delegate.newInstance(); + public T read(StreamInput in) throws IOException { + return delegate.read(in); } @Override diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index b120c7a3e7dd3..ad894906cfb2d 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -176,8 +176,8 @@ private TransportResponseHandler wrapLivenessRespo ClusterName clusterName) { return new TransportResponseHandler() { @Override - public T newInstance() { - return handler.newInstance(); + public T read(StreamInput in) throws IOException { + return handler.read(in); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index d40d558d20b6b..e0593a694d0b4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -899,8 +900,8 @@ protected TransportResponseHandler getPingResponseHandler(P TransportResponseHandler original = super.getPingResponseHandler(pingingRound, node); return new TransportResponseHandler() { @Override - public UnicastPingResponse newInstance() { - return original.newInstance(); + public UnicastPingResponse read(StreamInput in) throws IOException { + return original.read(in); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java index cbe2006bf74f8..bbb6c9567362d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -30,6 +31,7 @@ import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Random; @@ -100,8 +102,8 @@ public void sendRequest(Transport.Connection conne assertVersionSerializable(request); sender.sendRequest(connection, action, request, options, new TransportResponseHandler() { @Override - public T newInstance() { - return handler.newInstance(); + public T read(StreamInput in) throws IOException { + return handler.read(in); } @Override