Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ private class ShardActiveResponseHandler implements TransportResponseHandler<Sha
}

@Override
public ShardActiveResponse newInstance() {
return new ShardActiveResponse();
public ShardActiveResponse read(StreamInput in) throws IOException {
return new ShardActiveResponse(in);
}

@Override
Expand Down Expand Up @@ -417,20 +417,15 @@ public void writeTo(StreamOutput out) throws IOException {

private static class ShardActiveResponse extends TransportResponse {

private boolean shardActive;
private DiscoveryNode node;

ShardActiveResponse() {
}
private final boolean shardActive;
private final DiscoveryNode node;

ShardActiveResponse(boolean shardActive, DiscoveryNode node) {
this.shardActive = shardActive;
this.node = node;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ShardActiveResponse(StreamInput in) throws IOException {
shardActive = in.readBoolean();
node = new DiscoveryNode(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.BaseFuture;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -70,8 +72,8 @@ public V txGet(long timeout, TimeUnit unit) {
}

@Override
public V newInstance() {
return handler.newInstance();
public V read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,13 +1432,13 @@ static void ensureVersionCompatibility(Version version, Version currentVersion,
}

private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new TransportAddress(remoteAddress));
final TransportResponse response;
try {
response.readFrom(stream);
response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
return;
}
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,34 @@

package org.elasticsearch.transport;

public interface TransportResponseHandler<T extends TransportResponse> {
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;

public interface TransportResponseHandler<T extends TransportResponse> extends Writeable.Reader<T> {

/**
* @deprecated Implement {@link #read(StreamInput)} instead.
*/
@Deprecated
default T newInstance() {
throw new UnsupportedOperationException();
}

/**
* creates a new instance of the return type from the remote call.
* called by the infra before de-serializing the response.
* deserializes a new instance of the return type from the stream.
* called by the infra when de-serializing the response.
*
* @return a new response copy.
* @return the deserialized response.
*/
T newInstance();
@SuppressWarnings("deprecation")
@Override
default T read(StreamInput in) throws IOException {
T instance = newInstance();
instance.readFrom(in);
return instance;
}

void handleResponse(T response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,8 @@ public ContextRestoreResponseHandler(Supplier<ThreadContext.StoredContext> conte
}

@Override
public T newInstance() {
return delegate.newInstance();
public T read(StreamInput in) throws IOException {
return delegate.read(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -176,8 +176,8 @@ private <T extends TransportResponse> TransportResponseHandler wrapLivenessRespo
ClusterName clusterName) {
return new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -899,8 +900,8 @@ protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(P
TransportResponseHandler<UnicastPingResponse> original = super.getPingResponseHandler(pingingRound, node);
return new TransportResponseHandler<UnicastPingResponse>() {
@Override
public UnicastPingResponse newInstance() {
return original.newInstance();
public UnicastPingResponse read(StreamInput in) throws IOException {
return original.read(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -100,8 +102,8 @@ public <T extends TransportResponse> void sendRequest(Transport.Connection conne
assertVersionSerializable(request);
sender.sendRequest(connection, action, request, options, new TransportResponseHandler<T>() {
@Override
public T newInstance() {
return handler.newInstance();
public T read(StreamInput in) throws IOException {
return handler.read(in);
}

@Override
Expand Down